This is an automated email from the ASF dual-hosted git repository. karp pushed a commit to branch snapshot-1.0.4 in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
commit a539688121c9a3909af0818b9a01f5347548c256 Author: 维章 <[email protected]> AuthorDate: Mon May 23 16:09:07 2022 +0800 merge from upstream/snapshot-1.0.3 --- NOTICE | 2 +- README.md | 16 +-- docs/README.md | 142 +++++++++++++++++++++ docs/SUMMARY.md | 8 ++ ...225\264\344\275\223\346\236\266\346\236\204.md" | 33 +++++ .../2.\346\236\204\345\273\272DataStream.md" | 73 +++++++++++ .../3.\345\220\257\345\212\250DataStream.md" | 53 ++++++++ ...265\201\350\275\254\350\277\207\347\250\213.md" | 63 +++++++++ ...256\227\345\255\220\350\247\243\346\236\220.md" | 55 ++++++++ ...256\236\347\216\260\345\256\271\351\224\231.md" | 0 "docs/images/Pipeline\347\261\273\345\233\276.png" | Bin 0 -> 44207 bytes docs/images/img.png | Bin 38684 -> 0 bytes docs/images/img_1.png | Bin 43711 -> 0 bytes docs/images/img_2.png | Bin 103151 -> 0 bytes docs/images/window.png | Bin 0 -> 241692 bytes ...75\223\346\236\266\346\236\204\345\233\276.png" | Bin 0 -> 60493 bytes ...00\273\344\275\223\350\277\207\347\250\213.png" | Bin 0 -> 44252 bytes .../\346\211\251\345\256\271\345\211\215.png" | Bin 0 -> 56733 bytes ...12\266\346\200\201\347\256\227\345\255\220.png" | Bin 0 -> 35766 bytes "docs/images/\347\212\266\346\200\201.png" | Bin 0 -> 47527 bytes "docs/images/\347\274\251\345\256\271.png" | Bin 0 -> 51087 bytes quick_start.md => docs/quick_start/README.md | 0 quick_start.md | 92 +++++++++---- .../window/operator/impl/SessionOperator.java | 8 ++ .../streams/window/operator/join/JoinWindow.java | 9 ++ stream_sink.md | 10 +- stream_source.md | 2 +- 27 files changed, 524 insertions(+), 42 deletions(-) diff --git a/NOTICE b/NOTICE index 086ee9fa..a347efb1 100644 --- a/NOTICE +++ b/NOTICE @@ -1,5 +1,5 @@ Apache RocketMQ -Copyright 2016-2021 The Apache Software Foundation +Copyright 2016-2022 The Apache Software Foundation This product includes software developed at The Apache Software Foundation (http://www.apache.org/). diff --git a/README.md b/README.md index 51d9a6cf..f2f475e7 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,11 @@ -# Summary - +# RocketMQ Streams +[](https://app.travis-ci.com/apache/rocketmq-streams) +[](https://app.codecov.io/gh/apache/rocketmq-streams) +[](https://github.com/apache/rocketmq-streams/releases) +[](https://www.apache.org/licenses/LICENSE-2.0.html) +[](http://isitmaintained.com/project/apache/rocketmq-streams "Average time to resolve an issue") +[](http://isitmaintained.com/project/apache/rocketmq-streams "Percentage of issues still open") +[](https://twitter.com/intent/follow?screen_name=ApacheRocketMQ) ## [中文文档](./README-chinese.md) @@ -115,9 +121,3 @@ source .start(); ``` -======= -* [Quick Start](quick\_start.md) -* [创建实时任务数据源](stream\_source.md) -* [创建实时任务数据输出](stream\_sink.md) -* [数据处理逻辑](stream\_transform.md) ->>>>>>> 1cd2dd0291dbcab033e6773021ddca13ce819f82 diff --git a/docs/README.md b/docs/README.md new file mode 100644 index 00000000..9d5e464e --- /dev/null +++ b/docs/README.md @@ -0,0 +1,142 @@ +[](https://github.com/apache/rocketmq-streams/releases) +[](https://www.apache.org/licenses/LICENSE-2.0.html) +[](http://isitmaintained.com/project/apache/rocketmq-streams "Average time to resolve an issue") +[](http://isitmaintained.com/project/apache/rocketmq-streams "Percentage of issues still open") +[](https://twitter.com/intent/follow?screen_name=ApacheRocketMQ) + +# Features + +* 轻量级部署:可以单独部署,也支持集群部署 +* 多种类型的数据输入以及输出,source 支持 rocketmq , sink 支持db, rocketmq 等 + +# DataStream Example + +```java +import org.apache.rocketmq.streams.client.transform.DataStream; + +DataStreamSource source=StreamBuilder.dataStream("namespace","pipeline"); + + source + .fromFile("~/admin/data/text.txt",false) + .map(message->message) + .toPrint(1) + .start(); +``` + +# Maven Repository + +```xml + +<dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-streams-clients</artifactId> + <version>1.0.0-SNAPSHOT</version> +</dependency> +``` + +# Core API + +rocketmq-stream 实现了一系列高级的API,可以让用户很方便的编写流计算的程序,实现自己的业务需求; + +## StreamBuilder + +StreamBuilder 用于构建流任务的源; + ++ [dataStream(nameSpaceName,pipelineName)]() 返回DataStreamSource实例,用于分段编程实现流计算任务; + +## DataStream API + +### Source + +DataStreamSource 是分段式编程的源头类,用于对接各种数据源, 从各大消息队列中获取数据; + ++ ```fromFile``` 从文件中读取数据, 该方法包含俩个参数 + + ```filePath``` 文件路径,必填参数 + + ```isJsonData``` 是否json数据, 非必填参数, 默认为```true``` + + ++ ```fromRocketmq``` 从rocketmq中获取数据,包含四个参数 + + ```topic``` rocketmq消息队列的topic名称,必填参数 + + ```groupName``` 消费者组的名称,必填参数 + + ```isJson``` 是否json格式,非必填参数 + + ```tags``` rocketmq消费的tags值,用于过滤消息,非必填参数 + ++ ```fromMqtt``` 从满足MQTT协议的终端读取数据, 满足边缘计算的场景,其中包含9个参数 + + ```url``` mqtt broker的地址,必填参数 + + ```clientId``` 客户端ID, 必填参数,相同的clientId有负载的作用 + + ```topic``` topic信息, 必填参数 + + ```username``` 用户名, 非必填,在mqtt端添加鉴权机制时使用 + + ```password``` 密码,非必填参数,在mqtt端添加鉴权机制时使用 + + ```cleanSession``` 是否清理session信息, 非必填,默认为true + + ```connectionTimeout``` 连接超时信息, 非必填,默认是10s + + ```aliveInterval``` 判断连接是否活跃的间隔时间,非必填,默认是60s + + ```automaticReconnect``` 连接断开后自动重连机制,非必填,默认是true + + ++ ```from``` 自定义的数据源, 通过实现ISource接口实现自己的数据源 + +### transform + +transform 允许在流计算过程中对输入源的数据进行修改,进行下一步的操作;DataStream API中包括```DataStream```,```JoinStream```, ```SplitStream```,```WindowStream```等多个transform类; + +DataStream实现了一系列常见的流计算算子 + ++ ```map``` 通过将源的每个记录传递给函数func来返回一个新的DataStream ++ ```flatmap``` 与map类似,一个输入项对应0个或者多个输出项 ++ ```filter``` 只选择func返回true的源DStream的记录来返回一个新的DStream ++ ```forEach``` 对每个记录执行一次函数func, 返回一个新的DataStream ++ ```selectFields``` 对每个记录返回对应的字段值,返回一个新的DataStream ++ ```operate``` 对每个记录执行一次自定义的函数,返回一个新的DataStream ++ ```script``` 针对每个记录的字段执行一段脚本,返回新的字段,生成一个新的DataStream ++ ```toPrint``` 将结果在控制台打印,生成新的DataStream实例 ++ ```toFile``` 将结果保存为文件,生成一个新的DataStream实例 ++ ```toMqtt``` 将结果输出到满足mqtt协议的设备中,生成一个新的DataStream实例 ++ ```toDB``` 将结果保存到数据库 ++ ```toRocketmq``` 将结果输出到rocketmq ++ ```to``` 将结果经过自定义的ISink接口输出到指定的存储 ++ ```window``` 在窗口内进行相关的统计分析,一般会与```groupBy```连用, ```window()```用来定义窗口的大小, ```groupBy()```用来定义统计分析的主key,可以指定多个 + + ```count``` 在窗口内计数 + + ```min``` 获取窗口内统计值的最小值 + + ```max``` 获取窗口内统计值得最大值 + + ```avg``` 获取窗口内统计值的平均值 + + ```sum``` 获取窗口内统计值的加和值 + + ```reduce``` 在窗口内进行自定义的汇总运算 ++ ```join``` 根据条件将俩个流进行内关联 ++ ```leftJoin``` 根据条件将俩个流的数据进行左关联 ++ ```dimJoin``` 根据条件将流与维表进行内关联,维表的数据可以来自于文件,也可以来自于数据库 ++ ```dimLeftJoin``` 根据条件将流与维表进行左关联,维表的数据可以来自于文件,也可以来自于数据库 ++ ```union``` 将俩个流进行合并 ++ ```split``` 将一个数据流按照标签进行拆分,分为不同的数据流供下游进行分析计算 ++ ```with``` with算子用来指定计算过程中的相关策略,包括checkpoint的存储策略,state的存储策略等 + +#### Strategy + +策略机制主要用来控制计算引擎运行过程中的底层逻辑,如checkpoint,state的存储方式等,后续还会增加对窗口、双流join等的控制;所有的控制策略通过```with```算子传入,可以同时传入多个策略类型; + +```java +//指定checkpoint的存储策略 +source + .fromRocketmq("TSG_META_INFO","") + .map(message->message+"--") + .toPrint(1) + .with(CheckpointStrategy.db("jdbc:mysql://XXXXX:3306/XXXXX","","",0L)) + .start(); +``` + +# 运行 + +Rocketmq-Streams 作为典型的java应用,既可以集成在业务系统里运行,也可以作为一个独立的jar包来运行; + +首先对应用的源码进行编译 + +```shell +mvn -Prelease-all -DskipTests clean install -U +``` + +然后直接通过java指令来运行 + +```shell + java -jar jarName mainClass +``` + +更多详细的案例可以看[这里](docs/SUMMARY.md) \ No newline at end of file diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md new file mode 100644 index 00000000..950f99bd --- /dev/null +++ b/docs/SUMMARY.md @@ -0,0 +1,8 @@ +# Summary + +* [Introduction](README.md) +* [Quick Start](quick_start/README.md) +* [创建实时任务数据源](stream_source/README.md) +* [创建实时任务数据输出](stream_sink/README.md) +* [数据处理逻辑](stream_transform/README.md) + diff --git "a/docs/design/1.RocketMQ-streams\346\225\264\344\275\223\346\236\266\346\236\204.md" "b/docs/design/1.RocketMQ-streams\346\225\264\344\275\223\346\236\266\346\236\204.md" new file mode 100644 index 00000000..8e564a8c --- /dev/null +++ "b/docs/design/1.RocketMQ-streams\346\225\264\344\275\223\346\236\266\346\236\204.md" @@ -0,0 +1,33 @@ +### 总体架构 + + + +数据从RocketMQ中被RocketMQ-streams消费,经过处理最终被写回到RocketMQ。 +如果流处理任务中含有算子groupBy,则需要将数据按照Key进行分组,将分组数据写入shuffle topic。后续算子从 +shuffle topic消费。如果还涉及count之类有状态算子,那么计算时需要读写状态,在窗口触发之后将计算结果写出。 + + +### 任务并行度模型 + + + +计算实例实质上是依赖了Rocket-streams SDK的client,因此,计算实例消费的MQ依赖RocketMQ rebalance分配, +计算实例总个数也不能大于消费总MQ个数,否则将有部分计算实例处于等待状态,消费不到数据。 + +一个计算实例可以消费多个MQ,一个实例内也只有一张计算拓扑图。 + +### 状态 + + +对于有状态算子,他的状态本地依赖RocksDB加速读取,远程依赖Mysql做持久化。允许流计算任务时,可以只依赖本地存储 +RocksDB, 只需要将setLocalStorageOnly设置成true即可。这种情况下可能存在状态丢失。 + + + +### 扩缩容 + + + +当计算实例从3个缩容到2个,借助于RocketMQ的rebalance,MQ会在计算实例之间重新分配。 +Instance1上消费的MQ2和MQ3被分配到Instance2和Instance3上,这两个MQ的状态数据也需要迁移到Instance2 +和Instance3上,这也暗示,状态数据是根据源数据分片保存的;扩容则是刚好相反的过程。 diff --git "a/docs/design/2.\346\236\204\345\273\272DataStream.md" "b/docs/design/2.\346\236\204\345\273\272DataStream.md" new file mode 100644 index 00000000..af12e27a --- /dev/null +++ "b/docs/design/2.\346\236\204\345\273\272DataStream.md" @@ -0,0 +1,73 @@ +DataStreamSource中有一个PipelineBuilder,在后续构建过程中,这个PipelineBuilder会一直向后流传, +将构建过程中产生的source、stage添加进来;最后在start的时候,真正利用PipelineBuilder构建出拓扑图。 + +### source类型 + - 设置source的namespace、configureName; + - 将source保存到PipelineBuilder中; + - 将source作为source节点保存到PipelineBuilder中的ChainPipeline中; + +### ChainStage类型 + +所有的其他运算,包括map,filter,script,window都会先构建出ChainStage,然后以ChainStage的身份进入 +PipelineBuilder,参加后续构建。 + +在DataStream中一个典型的添加新算子,过程如下所示: +```java + +public DataStream script(String script) { + //将用户定义的cript转化成ChainStage + // ChainStage<?> stage = this.mainPipelineBuilder.createStage(new ScriptOperator(script)); + //将ChainStage添加到PipelineBuilder中,构建拓扑。 + this.mainPipelineBuilder.setTopologyStages(currentChainStage, stage); + //将PipelineBuilder构建成DataStream,向后传递,后续还可以用该PipelineBuilder构建拓扑 + return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, stage); +} + +``` + +### 创建ChainStage + +PipelineBuilder创建,创建过程中会设置label,并将这个ChainStage添加到PipelineBuilder持有ChainPipeline中 + +- 把ChainStage添加到pipeline中 + 在构建过程中,所有的添加算子都使用一个共同的PipelineBuilder实例,PipelineBuilder结构如图所示,他持有 + 一个ChainPipeline实例,ChainPipeline实例中含有一个ISource和多个stages,还有一个label与stage的映射关系, + 以及用于寻找下个stage的label。 +  +在createStage过程中,将chainStage加入到Pipeline中。 + +在setTopologyStages 过程中将label加入到Pipeline中; + +### 设置拓扑 +```java +public void setTopologyStages(ChainStage currentChainStage, List<ChainStage> nextStages) { + if (isBreak) { + return; + } + if (nextStages == null) { + return; + } + List<String> lableNames = new ArrayList<>(); + for (ChainStage stage : nextStages) { + lableNames.add(stage.getLabel()); + } + + if (currentChainStage == null) { + this.pipeline.setChannelNextStageLabel(lableNames); + } else { + currentChainStage.setNextStageLabels(lableNames); + for (ChainStage stage : nextStages) { + stage.getPrevStageLabels().add(currentChainStage.getLabel()); + } + } + } +``` + +如果是首个ChainStage,则设置下一跳的label;如果不是首个,需要将下个stage的label设置进入当前stage。 +同时,下个stage也需要设置前一个stage的label标签。形成双向链表的结构。 + + + + + + diff --git "a/docs/design/3.\345\220\257\345\212\250DataStream.md" "b/docs/design/3.\345\220\257\345\212\250DataStream.md" new file mode 100644 index 00000000..91f91f2f --- /dev/null +++ "b/docs/design/3.\345\220\257\345\212\250DataStream.md" @@ -0,0 +1,53 @@ +### Start流程 + +流式计算在运行时可以拉起多个相同实例进行扩容,所以不能直接启动上述已经构建好的拓扑图,需要将上述构建好的拓扑 +图保存起来,需要扩容时,直接拿出算子的副本,实例化启动即可。 + +### 统一管理点 + +- 加载统一管理点IConfigurableService; + + 三种方式存储:Memory, db, file + +- PipelineBuilder的build方法,将构建构成中保存起来的IConfigurable,source和statge都是IConfigurable, + 保存到IConfigurableService中; + +- IConfigurableService的refreshConfigurable方法; + + 1.主要做的事可以概括:从统一管理点加载出组件,赋值,init,在调用后置方法doProcessAfterRefreshConfigurable。 + + 2.ChainPipeline的后置方法比较特殊,会调用pipeline中各个组件的后置方法,如果这个组件是普通UDFChainStage, + 那么将会反序列化,实例成StageBuilder。如果是WindowChainStage,会讲用户数据接收的window实例化出来。 + + 3.从IConfigurable中加载实例副本出来; + + 4.将实例副本赋值; + + 5.初始化实例副本,实例都是AbstractConfigurable的继承类,调用他的的init方法。比如在初始化rocketmqSource + 的时候,就会在此时调用init方法,先于启动方法调用; + + 6.调用IConfigurable的doProcessAfterRefreshConfigurable方法,目前只有ChainPipeline会调用, + (典型的是ChainPipeline),会在此方法中构建label与stage映射的stageMap;设置source;再调用 + ChainPipeline中各个stage的doProcessAfterRefreshConfigurable方法; + + 7.这里ChainPipeline的stage都是UDFChainStage类似。UDFChainStage的 + doProcessAfterRefreshConfigurable方法会将之前序列化好的StageBuilder反序列化,成为StageBuilder实例。 + + 8.如果这个stage是window类型的WindowChainStage,ChainPipeline调用各个stage的 + doProcessAfterRefreshConfigurable。这里会将用于数据接收的window实例化赋值; + + 9.OutputChainStage此时会从统一管理点IConfigurableService查询出sink实例,并赋值给自己sink字段; + + +### ChainPipeline的启动 +```java +pipeline.startChannel(); +``` + +将ChainPipeline作为整个数据接收的入口,并启动source; + +当source有数据进来时,ChainPipeline将会收到数据;具体方法是ChainPipeline的doMessageInner方法; + +该方法将数据封装承AbstractContext后,向后传递; + + diff --git "a/docs/design/4.\346\225\260\346\215\256\347\232\204\346\265\201\350\275\254\350\277\207\347\250\213.md" "b/docs/design/4.\346\225\260\346\215\256\347\232\204\346\265\201\350\275\254\350\277\207\347\250\213.md" new file mode 100644 index 00000000..1eb791cb --- /dev/null +++ "b/docs/design/4.\346\225\260\346\215\256\347\232\204\346\265\201\350\275\254\350\277\207\347\250\213.md" @@ -0,0 +1,63 @@ +###总体过程 + + + +数据流转整体过程如图所示,黑色箭头线是数据流,橙色为控制流。数据的整体流向是从source中接收到,经过 +AbstractSource判断是否发出系统消息,在进入ChainPipeline,ChainPipeline根据之间构建好的处理拓扑图,使用 +深度优先策略找出下一个处理节点stage,交给Pipeline。Pipeline发现如果是系统消息则对stage执行特殊的控制逻辑, +如果不是,则用stage来处理具体数据。 + +### 无window算子执行流程 +- source从RocketMQ中消费数据,进入RocketMQSource的父类AbstractSource; +- AbstractSource启动控制流,判断是否数据来自新分片,如果是,首先向下游传递一条NewSplitMessage消息,等待系 + 统消息处理完成返回后,才能继续处理该数据。 +- NewSplitMessage进入Pipeline,如果是系统消息,stage执行该类系统消息对应的控制操作。如果不是系统消息则用 +stage处理数据; +- Pipeline执行完成后,返回到ChainPipeline,选择下一个stage继续执行; +- 遍历stage直到结束。 + +### 含有window算子执行流程 + + + +- 数据流和控制流在上述流程一致,即先进入source,然后由AbstractSource判断是否发出发出系统消息,再进入 + ChainPipeline按照已经构建好的拓扑图执行。 +- 不同的是,如果是window算子,那么这条数据在执行具体计算之前需要先按照groupBy分组,在执行算子,例如count。 +分组操作需要借助于shuffle topic完成,即写入shuffle topic之前先按照groupBy的值,计算数据写入目的 + MessageQueue,相同groupBy值的数据将被写入一个MessageQueue中。这样shuffle数据被读取时, + groupBy值相同的数据总会被一个client处理,达到按照groupBy分组处理的效果。 + +- ShuffleChannel会自动订阅、消费shuffle topic。数据会经过shuffle并在ShuffleChannel中再次被消费到。 +- 判断是否是系统消息,如果是,执行该种类系统消息对应的控制流操作。 +- 如果不是系统消息,触发window中算子计算,比如算子是count,就对某个key出现的次数加1;count算子用到的状 + 态会在接收到NewSplitMessage类型系统消息时提前加载好。计算结束后的状态保存到RocksDB或者mysql中。 + +- window到时间后,将计算结果输出到下游stage继续计算,并清理RocksDB、Mysql中对应的状态。 + + +### 系统消息 + +#### NewSplitMessage +当发现数据来自新分片(MessageQueue)时,由AbstractSource产生并向下游拓扑传递。 + +作用于window算子,使其提前加载该分片对应的状态数据到内存,使得状态数据对该分片数据进行计算时,能使用 +到对应的状态,得出正确的结果。 + +#### CheckPointMessage + +##### 产生时机: +- 消费分片移除时; +- RocketMQ-streams向broker提交消费offset时; +- 处理完一批次消息后; + +##### 作用 +- 作用于各个缓存,例如将数据写入shuffle topic之前的WindowCache,使缓存中数据写出到下游。 +- 作用于sink,将sink中缓存而未写出的数据写出; +- 将有状态算子的状态flush到存储; + +#### RemoveSplitMessage +比较RocketMQ client触发rebalance前后消费的分片,如果某个分片不在被消费,需要将该分片移除,在移除该分配时发出 +RemoveSplitMessage类型消息。 + +作用于window算子,将RocksDB中状态清除; + diff --git "a/docs/design/5.Window\347\256\227\345\255\220\350\247\243\346\236\220.md" "b/docs/design/5.Window\347\256\227\345\255\220\350\247\243\346\236\220.md" new file mode 100644 index 00000000..4e8be748 --- /dev/null +++ "b/docs/design/5.Window\347\256\227\345\255\220\350\247\243\346\236\220.md" @@ -0,0 +1,55 @@ +### window算子初始化 +window的实例化和初始化时机,与普通无状态算子一样,在构建DataStream阶段以stage形式加入pipeline。在启动 +DataStream阶段完成window的初始化。 + + + +- 给window初始化WindowStorage用户状态存储; + + WindowStorage包括localStorage存储和remoteStorage存储;localStorage使用RocksDB, + remoteStorage使用mysql; + +- 向window添加一个WindowCache的匿名实例,用于存储写入shuffle topic之前数据; +- 向window添加SQLCache,作为写入Mysql之前的缓存; +- 向window添加ShuffleChannel,作为写出shuffle和接收来自shufffle topic数据的通道; + + +### ShuffleChannel写出shuffle数据 +AbstractShuffleWindow的doMessage方法,将数据写入shuffleChannel +```java +public AbstractContext<IMessage> doMessage(IMessage message, AbstractContext context) { + shuffleChannel.startChannel(); + return super.doMessage(message, context); +} +``` + +- shuffleChannel.startChannel +启动shuffleChannel中的consumer,从shuffletopic中消费数据;如果有消费到数据,将由 + shuffleChannel的doMessage处理。 + +- AbstractWindow.doMessage方法 + +对于一条消息来说,window 首先需要检查是否有窗口实例,如果没有则创建。如果窗口实例已经超过最大的watermark, +数据丢弃,否则进行消息积累 消息会先经历batchAdd 然后flush加入到windowCache中;windowCache定时触发,加入到 +shuffleMsgCache中,shuffleMsgCache中定时发出,用shuffleMsgCache中的producer写出到rocketmq。 + +### ShuffleChannel接收到shuffle数据 +ShuffleChannel#doMessage方法; + +将shuffle消息加入到shuffleCache中 + +最终进入ShuffleCache#batchInsert中 + +WindowOperator#shuffleCalculate中 + +实际窗口计算:WindowValue#calculate + +计算后并不会马上触发窗口,窗口需要定时出发 + +### window触发 + WindowFireSource#startSource启动定时任务,1s检查一次窗口是否触发WindowFireSource#fireWindowInstance +WindowOperator#fireWindowInstance + +windowFireSource.executeMessage + +windowFireSource.executeMessage这个方法里面会执行pipeline的下个节点 \ No newline at end of file diff --git "a/docs/design/6.RocketMQ-streams\345\246\202\344\275\225\345\256\236\347\216\260\345\256\271\351\224\231.md" "b/docs/design/6.RocketMQ-streams\345\246\202\344\275\225\345\256\236\347\216\260\345\256\271\351\224\231.md" new file mode 100644 index 00000000..e69de29b diff --git "a/docs/images/Pipeline\347\261\273\345\233\276.png" "b/docs/images/Pipeline\347\261\273\345\233\276.png" new file mode 100644 index 00000000..dafe81a1 Binary files /dev/null and "b/docs/images/Pipeline\347\261\273\345\233\276.png" differ diff --git a/docs/images/img.png b/docs/images/img.png deleted file mode 100644 index b814adf7..00000000 Binary files a/docs/images/img.png and /dev/null differ diff --git a/docs/images/img_1.png b/docs/images/img_1.png deleted file mode 100644 index 16a45cc1..00000000 Binary files a/docs/images/img_1.png and /dev/null differ diff --git a/docs/images/img_2.png b/docs/images/img_2.png deleted file mode 100644 index 0b75ab05..00000000 Binary files a/docs/images/img_2.png and /dev/null differ diff --git a/docs/images/window.png b/docs/images/window.png new file mode 100644 index 00000000..30ba8945 Binary files /dev/null and b/docs/images/window.png differ diff --git "a/docs/images/\346\200\273\344\275\223\346\236\266\346\236\204\345\233\276.png" "b/docs/images/\346\200\273\344\275\223\346\236\266\346\236\204\345\233\276.png" new file mode 100644 index 00000000..5eba9cec Binary files /dev/null and "b/docs/images/\346\200\273\344\275\223\346\236\266\346\236\204\345\233\276.png" differ diff --git "a/docs/images/\346\200\273\344\275\223\350\277\207\347\250\213.png" "b/docs/images/\346\200\273\344\275\223\350\277\207\347\250\213.png" new file mode 100644 index 00000000..7a68947e Binary files /dev/null and "b/docs/images/\346\200\273\344\275\223\350\277\207\347\250\213.png" differ diff --git "a/docs/images/\346\211\251\345\256\271\345\211\215.png" "b/docs/images/\346\211\251\345\256\271\345\211\215.png" new file mode 100644 index 00000000..5232b762 Binary files /dev/null and "b/docs/images/\346\211\251\345\256\271\345\211\215.png" differ diff --git "a/docs/images/\346\234\211\347\212\266\346\200\201\347\256\227\345\255\220.png" "b/docs/images/\346\234\211\347\212\266\346\200\201\347\256\227\345\255\220.png" new file mode 100644 index 00000000..a9ec479c Binary files /dev/null and "b/docs/images/\346\234\211\347\212\266\346\200\201\347\256\227\345\255\220.png" differ diff --git "a/docs/images/\347\212\266\346\200\201.png" "b/docs/images/\347\212\266\346\200\201.png" new file mode 100644 index 00000000..e2fd9b2e Binary files /dev/null and "b/docs/images/\347\212\266\346\200\201.png" differ diff --git "a/docs/images/\347\274\251\345\256\271.png" "b/docs/images/\347\274\251\345\256\271.png" new file mode 100644 index 00000000..05dcee41 Binary files /dev/null and "b/docs/images/\347\274\251\345\256\271.png" differ diff --git a/quick_start.md b/docs/quick_start/README.md similarity index 100% copy from quick_start.md copy to docs/quick_start/README.md diff --git a/quick_start.md b/quick_start.md index a60dbb95..adcb529d 100644 --- a/quick_start.md +++ b/quick_start.md @@ -1,46 +1,84 @@ -# 快速开发 +## rocketmq-streams 快速搭建 +--- -## 引入相关的jar包 +### 前言 -```xml +本文档主要介绍如何基于rocketmq-streams快速搭建流处理任务,搭建过程中某些例子会用到rocketmq,可以参考[rocketmq搭建文档](https://rocketmq.apache.org/docs/quick-start/) -<dependency> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-streams-clients</artifactId> -</dependency> +### 1、源码构建 -``` +#### 1.1、构建环境 -## 开发实时应用程序 +- JDK 1.8 and above +- Maven 3.2 and above -```java +#### 1.2、构建Rocketmq-streams -public class RocketmqExample { +```shell +git clone https://github.com/apache/rocketmq-streams.git +cd rocketmq-streams +mvn clean -DskipTests install -U - public static void main(String[] args) { +``` - DataStreamSource dataStream = StreamBuilder.dataStream("test_namespace", "graph_pipeline"); +### 2、基于rocketmq-streams创建应用 - dataStream - .fromFile("data.csv", false) //构建实时任务的数据源 - .map(message -> message.split(",")) //构建实时任务处理的逻辑过程 - .toPrint(1) //构建实时任务的输出 - .start(); //启动实时任务 - } -} +#### 2.1、pom依赖 +```xml + +<dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-streams-clients</artifactId> +</dependency> ``` -## 运行 +#### 2.2、shade clients依赖包 -打包 +```xml -```shell -mvn -Prelease-all -DskipTests clean install -U +<build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <version>3.2.1</version> + <executions> + <execution> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <minimizeJar>false</minimizeJar> + <shadedArtifactAttached>true</shadedArtifactAttached> + <artifactSet> + <includes> + <include>org.apache.rocketmq:rocketmq-streams-clients</include> + </includes> + </artifactSet> + </configuration> + </execution> + </executions> + </plugin> + </plugins> +</build> ``` -运行 +#### 2.3、编写业务代码 + +快速编写一个统计页面点击次数的小程序:Please see the [rocketmq-streams-examples](rocketmq-streams-examples/README.md) + +#### 2.4、运行 + +- 前提:在从rocketmq中读取数据做流处理时,需要运行topic在rocketmq中自动创建,因为做groupBy操作时,需要用到rocketmq作为shuffle数据的读写目的地。 +- 命令: -```shell - java -jar jarName mainClass ``` + java -jar XXXX-shade.jar \ + -Dlog4j.level=ERROR \ + -Dlog4j.home=/logs \ + -Xms1024m \ + -Xmx1024m +``` + diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java index 0c10a9bf..14e1ffd3 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java @@ -524,4 +524,12 @@ public class SessionOperator extends WindowOperator { } return numer; } + + public int getSessionTimeOut() { + return sessionTimeOut; + } + + public void setSessionTimeOut(int sessionTimeOut) { + this.sessionTimeOut = sessionTimeOut; + } } \ No newline at end of file diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java index f356a286..103f40e3 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java @@ -60,6 +60,7 @@ public class JoinWindow extends AbstractShuffleWindow { protected String joinType;//join类型,值为INNER,LEFT protected String expression;//条件表达式。在存在非等值比较时使用 + protected String rightDependentTableName; @Override protected int doFireWindowInstance(WindowInstance instance) { @@ -557,4 +558,12 @@ public class JoinWindow extends AbstractShuffleWindow { public void setExpression(String expression) { this.expression = expression; } + + public String getRightDependentTableName() { + return rightDependentTableName; + } + + public void setRightDependentTableName(String rightDependentTableName) { + this.rightDependentTableName = rightDependentTableName; + } } diff --git a/stream_sink.md b/stream_sink.md index d7718865..a30aae59 100644 --- a/stream_sink.md +++ b/stream_sink.md @@ -54,7 +54,7 @@ String topic=.....; //rocketmq 的topic String namesrvAddress=......; //rocketmq的nameserver - DataStream dataStream=dataStream.toRocketmq(topic,namesrvAddress); + DataStream dataStream=dataStreamSource.toRocketmq(topic,namesrvAddress); ``` @@ -65,7 +65,7 @@ String topic=.....; //rocketmq 的topic String groupName=.....; // rocketmq的消费组 String namesrvAddress=......; //rocketmq的nameserver - DataStream dataStream=dataStream.toRocketmq(topic,groupName,namesrvAddress); + DataStream dataStream=dataStreamSource.toRocketmq(topic,groupName,namesrvAddress); ``` @@ -77,7 +77,7 @@ String groupName=.....; // rocketmq的消费组 String namesrvAddress=......; //rocketmq的nameserver String tags=......; // rocketmq的tag信息 - DataStream dataStream=dataStream.toRocketmq(topic,tags,groupName,namesrvAddress); + DataStream dataStream=dataStreamSource.toRocketmq(topic,tags,groupName,namesrvAddress); ``` ## kafka @@ -95,7 +95,7 @@ String url=......; String clientId=......; String topic=......; - DataStream dataStream=dataStream.toMqtt(url,cliientId,topic); + DataStream dataStream=dataStreamSource.toMqtt(url,cliientId,topic); ``` @@ -108,7 +108,7 @@ String topic=......; String username=......; String password=......; - DataStream dataStream=dataStream.toMqtt(url,cliientId,topic,username,password); + DataStream dataStream=dataStreamSource.toMqtt(url,cliientId,topic,username,password); ``` diff --git a/stream_source.md b/stream_source.md index 8ae7b7e4..77fb96c4 100644 --- a/stream_source.md +++ b/stream_source.md @@ -54,7 +54,7 @@ String groupName = .....; // rocketmq的消费组 String namesrvAddress = ......; //rocketmq的nameserver Boolean isJsonData = true; //是否json - String tags = ......; //rocketmq的tag信息 + String tags = ......; // rocketmq的tag信息 DataStream dataStream = dataStreamSource.fromRocketmq(topic, groupName, tags, isJsonData, namesrvAddress); ```
