Rose手册第一章:入门指引

ROSE

1.1 简介: 人人网、糯米网释出的、开源的高效Java web开发框架。在小米米聊服务端再次被验证和使用。一个从零开始的创业公司,在大家技术背景不一的情况下,rose很简单快速地传达到了大家中间。本手册致力于让php开发人员也能快速使用上java开发高性能服务。 1)rose是什么?

基于IoC容器 (使用Spring 2.5.6).
收集最佳实践,形成规范和惯例,引导按规范惯例,简便开发.
收集通用功能,形成一些可使用的组件,提高生产效率.
特性的插拔,使用基于组合而非继承的设计.
提供可扩展的点,保持框架的可扩展性.
注重使用简易性的同时,注重内部代码设计和实现.

如果你是一个创业公司在选择php还是java,同时如果你的团队有一个人写过一年java其他人都没写过。如果你想选择一个更加大型的系统框架,请使用rose,它收集了来自人人网、糯米网、小米科技的众多工程师的经验,你可以免费拥有这些。

2)rose能做什么? 初级rose用户: rose可以用来完成一个网站。 中级rose用户: rose可以用来完成一个大型网站,它提供的jade功能使得你的项目可以快速开发,自然切入连接池;它提供的portal功能,可以将一个网页分多个线程发起向DB的请求,节省用户的时间;它提供的pipe功能类似的bigpipe,让前端加速,与此同时还有portal多线程的优势。 高级rose用户: rose可以加入spring任何特性,比如定时执行(去TM的crontab);比如拦截器做统一权限控制。可以配置主库从库,分表规则。配置thrift、zookeeper可以得到牛B的高可用性高性能服务集群。

1.2 简明教程: 下面开始来进入rose框架下的开发。只需要有一个感性的认识即可。下一章里会专门详细的手把手教你搭建hello world项目。 1)需要些什么? 提前学习什么是maven:简单地说,maven是个build工具,用一个pom.xml来定义项目的依赖。通过一个命令mvn,可以地build compile(知道make吧,类似,或者类似ant)。
也许还需要一个nexus,用来搭建自己的maven仓库(这些都是门槛啊,知道为什么java用的人在全球多,而在php的人似乎更多,因为我们的基础设施太落后了)。nexus的作用是配合maven工作。(54chen正在向sonatype申请将rose项目push到sonatype的官方库中,成功后这将省略掉这一步)

然后需要在你的项目的pom文件中添加:

  1. <dependency>  
  2.         <groupId>net.paoding</groupId>  
  3.         <artifactId>paoding-rose</artifactId>  
  4.         <version>1.0-SNAPSHOT</version>  
  5.     </dependency>  
  6.     <dependency>  
  7.         <groupId>net.paoding</groupId>  
  8.         <artifactId>paoding-rose-jade</artifactId>  
  9.         <version>1.1-SNAPSHOT</version>  
  10.     </dependency>  
  11.     <dependency>  
  12.         <groupId>net.paoding</groupId>  
  13.         <artifactId>paoding-rose-scanning</artifactId>  
  14.         <version>1.0-SNAPSHOT</version>  
  15.     </dependency>  
这是三个最基础的框架包。

2)一个controller长什么样?

  1. @Path("/")  
  2. public class TestController {  
  3.     @Get("hello")  
  4.     public String test(){  
  5.         return "@a";  
  6.     }  
  7. }  

http://localhost/hello

将会返回:a。就是这么简单。

下一节预告:rose手册第二章:配置与使用

Senseidb使用手记

linkedin, senseidb 下载 https://github.com/downloads/linkedin/sensei/sensei-1.0.0-release.tar.gz
这个版本是2012年1月份发布的,如果您看到此文时时间太久,请到他们的官方网站去寻找:http://senseidb.com

下载kafka kafka:由linkedin开源的高吞吐量的消息系统。
http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz

下载中文分词包IKAnalyzer http://code..com/p/ik-analyzer/downloads/list

编译sensei tar zxvf sensei-1.0.0-release.tar.gz
cd sensei-trunk
./bin/build.sh 或者 mvn package 要是没有maven客户端,需要自行安装(ubuntu下直接apt-get install maven)

编译kafka tar zxvf kafka-0.7.0-incubating-src.tar.gz
cd kafka-0.7.0-incubating-src/
./sbt
update
pacakge

编译IKA 新建一个jar包,里面只有一个类,内容如下:

  1. public class IKAnalyzerFactory implements SenseiPluginFactory<IKAnalyzer> {    
  2.     @Override    
  3.     public IKAnalyzer getBean(Map<String, String> initProperties, String fullPrefix, SenseiPluginRegistry pluginRegistry) {    
  4.         return new IKAnalyzer();    
  5.     }    
  6. }    
配置sensei的sensei.properties 重要的几点:
1)zookeeper的url: sensei.cluster.url=zookeeper.n.miliao.com:2181 2)IKA的class设置: sensei.index.analyzer.class = com.chen.IKA.IKAnalyzerFactory 3)kafka所使用的声明: sensei.gateway.class=com.senseidb.gateway.kafka.KafkaDataProviderBuilder
sensei.gateway.kafka.zookeeperUrl=zookeeper.n.miliao.com
#下面的topic很关键,消息系统中用topic区别不同的消息 sensei.gateway.kafka.topic=hotTopic #ProducerData生成时,也应该叫hotTopic

启动sensei 略去了设置schema,可参考http://www.54chen.com/java-ee/scalable-reltime-search-senseidb.html.

cd sensei-trunk
bin/start-sensei-node.sh example/xxx/conf

(所有的使用的jar,比如说kafka\IKA及其依赖的包,都扔到conf/ext下)

配置kafka的server.properties 关键的一点:
1)设置kafka使用的zookeeper地址:

zk.connect=zookeeper.n.miliao.com:2181

启动kafka

cd kafka-0.7.0-incubating-src
nohup bin/kafka-server-start.sh config/server.properties &

[如果你的schema定义无误,那么到这里你就可以通知kafka有消息进入,sensei就会自动开始消费这些消息进行索引等过程]

在具体业务中传入数据参考前文:http://www.54chen.com/java-ee/scalable-reltime-search-senseidb.html

发起搜索

  1. public static JSONArray doSearch(String key, int limit, int offset, String senseiServer) throws JSONException {  
  2.         SenseiClientRequest senseiRequest = SenseiClientRequest.builder().query(Queries.stringQuery(key)).paging(limit, offset)  
  3.                 .fetchStored(true).build();   
  4.         SenseiResult senseiResult = new SenseiServiceProxy(senseiServer, 8080).sendSearchRequest(senseiRequest);  
  5.         List<SenseiHit> list = senseiResult.getHits();  
  6.         JSONArray l = new JSONArray();  
  7.         for (int i = 0; i < list.size(); i++) {  
  8.             SenseiHit hit = list.get(i);  
  9.             l.put(new JSONObject(hit.getSrcdata()));  
  10.         }  
  11.         return l;  
  12.     }  

____EOF____
2012.3.13 version 1

分布式实时搜索方案介绍-senseidb

linkedin senseidb 名词解释 zoie:由linkedin开源的建立在lucene之上提供实时索引的系统。它利用两 个内存索引一个硬盘索引来实现实时搜索。
bobo-browse:由linkedin开源的基于lucene的分类浏览搜索系统。
zookeeper:一个分布式的,源码的分布式应用程序协调服务,常用来做配置服务。
senseidb:开源,分布式,实时,半结构化的数据库(官方网站上如是说)。实际上是一个将zoie、bobo-browse、zookeeper整合起来,提供各种方便的使用办法的一个项目。项目目标是达到简单易用的分布式实时搜索系统。
kafka:由linkedin开源的高吞吐量的消息系统。
norbert:norbert是一个提供分布式集群服务的开发框架,具备集群管理功能,对开发简单的通信架构,易扩展能承受高吞吐量的框架。scala实现,java无缝使用。其原理是:netty+zookeeper+pb。
IKAnalyzer:中文分词较好用的一个。
lucene:这个不用说了。

使用senseidb 1.解决中文分词问题
senseidb支持在配置上进行自定义analyzer。
要做的事情就是,建立一个jar包,里面只要一个类即可,依赖IKA的包。代码如下:

  1. public class IKAnalyzerFactory implements SenseiPluginFactory<IKAnalyzer> {  
  2.     @Override  
  3.     public IKAnalyzer getBean(Map<String, String> initProperties, String fullPrefix, SenseiPluginRegistry pluginRegistry) {  
  4.         return new IKAnalyzer();  
  5.     }  
  6. }  

将此jar包放于sensei/conf/ext目录下,修改定义文件sensei.properties:
sensei.index.analyzer.class = 上述jar包的全packagename classname

2.使用kafka数据源
在senseidb中有个叫gateway的概念,定义了数据源(实时的写入删除等)。
修改定义文件sensei.properties:
sensei.gateway.class=com.senseidb.gateway.kafka.KafkaDataProviderBuilder
这个class存在于sensei-trunk/sensei-gateway,打包扔进conf/ext即可。

在具体业务中传入数据:

  1. Properties props = new Properties();  
  2.         props.put("zk.connect""your zk server:2181");  
  3.         props.put("serializer.class""kafka.serializer.StringEncoder");  
  4.         ProducerConfig config = new ProducerConfig(props);  
  5.         Producer<String, String> producer = new Producer<String, String>(config);  
  6.   
  7.         int i = (int) (Math.random() * 10000);  
  8.   
  9.         JSONObject jo = new JSONObject();  
  10.         jo.put("thread_id", i);  
  11.         jo.put("hot_id", i + 300);  
  12.         jo.put("user_id", i + 1000);  
  13.         jo.put("type", i);  
  14.         jo.put("subject""这是一个标题" + index);  
  15.         jo.put("contents", index);  
  16.   
  17.         System.out.println(i);  
  18.   
  19.         String msg = jo.toString();  
  20.         // The message is sent to a randomly selected partition registered in ZK  
  21.         ProducerData<String, String> data = new ProducerData<String, String>("hotTopic", msg);  
  22.         producer.send(data);  
  23.   
  24.         producer.close();  

3.索引配置
conf/schema.xml文件中定义了两种结构,一个是table一个是facets。
table的column定义了每个字段。
如下的一个定义,配合了2中的写入:

  1. <table uid="thread_id">  
  2.       <column name="hot_id" type="long" />  
  3.       <column name="user_id" type="long" />  
  4.       <column name="type" type="int" />  
  5.       <column name="subject" type="sring" />  
  6.       <!-- attributes: indexed,store,termvector are only used when type is text -->  
  7.       <column name="contents" type="text" index="ANALYZED" store="YES" termvector="YES" />  
  8. </table>  

可供选择的其他分布式实时搜索方案 Katta:基于Lucene可伸缩分布式实时搜索方案,最早的方案。
Solandra:实时分布式搜索引擎,把solr与Cassandra集合在一起的一个方案。

Linkedin高吞吐量分布式消息系统kafka使用手记

linkedin kafka kafka是一种高吞吐量的分布式发布订阅消息系统,她有如下特性:

通过O(1)的磁盘数据结构提供消息的持久化,这种结构对于即使数以TB的消息存储也能够保持长时间的稳定性能。
高吞吐量:即使是非常普通的硬件kafka也可以支持每秒数十万的消息。
支持通过kafka服务器和消费机集群来分区消息。
支持Hadoop并行数据加载。
设计侧重高吞吐量,用于好友动态,相关性统计,排行统计,访问频率控制,批处理等系统。大部分的消息中间件能够处理实时性要求高的消息/数据,但是对于队列中大量未处理的消息/数据在持久性方面比较弱。

kakfa的consumer使用拉的方式工作。

安装kafka 下载:http://people.apache.org/~nehanarkhede/kafka-0.7.0-incubating/kafka-0.7.0-incubating-src.tar.gz

> tar xzf kafka-.tgz
> cd kafka- > ./sbt update
> ./sbt package
启动zkserver:
bin/zookeeper-server-start.sh config/zookeeper.properties
启动server:
bin/kafka-server-start.sh config/server.properties
就是这么简单。

使用kafka

  1. import java.util.Arrays;  
  2. import java.util.List;  
  3. import java.util.Properties;  
  4. import kafka.javaapi.producer.SyncProducer;  
  5. import kafka.javaapi.message.ByteBufferMessageSet;  
  6. import kafka.message.Message;  
  7. import kafka.producer.SyncProducerConfig;  
  8.   
  9. ...  
  10.   
  11. Properties props = new Properties();  
  12. props.put(“zk.connect”, “127.0.0.1:2181”);  
  13. props.put("serializer.class""kafka.serializer.StringEncoder");  
  14. ProducerConfig config = new ProducerConfig(props);  
  15. Producer<String, String> producer = new Producer<String, String>(config);  
  16.   
  17. Send a single message  
  18.   
  19. // The message is sent to a randomly selected partition registered in ZK  
  20. ProducerData<String, String> data = new ProducerData<String, String>("test-topic""test-message");  
  21. producer.send(data);      
  22.   
  23. producer.close();  

这样就是一个标准的producer。

consumer的代码

  1. // specify some consumer properties  
  2. Properties props = new Properties();  
  3. props.put("zk.connect""localhost:2181");  
  4. props.put("zk.connectiontimeout.ms""1000000");  
  5. props.put("groupid""test_group");  
  6.   
  7. // Create the connection to the cluster  
  8. ConsumerConfig consumerConfig = new ConsumerConfig(props);  
  9. ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);  
  10.   
  11. // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume  
  12. Map<String, List<KafkaMessageStream<Message>>> topicMessageStreams =   
  13.     consumerConnector.createMessageStreams(ImmutableMap.of("test"4));  
  14. List<KafkaMessageStream<Message>> streams = topicMessageStreams.get("test");  
  15.   
  16. // create list of 4 threads to consume from each of the partitions   
  17. ExecutorService executor = Executors.newFixedThreadPool(4);  
  18.   
  19. // consume the messages in the threads  
  20. for(final KafkaMessageStream<Message> stream: streams) {  
  21.   executor.submit(new Runnable() {  
  22.     public void run() {  
  23.       for(Message message: stream) {  
  24.         // process message  
  25.       }   
  26.     }  
  27.   });  
  28. }