This is an automated email from the ASF dual-hosted git repository. karp pushed a commit to branch snapshot-1.0.4 in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
commit a5a6ddde987adf1143fc5a1f4ed6902dff5368aa Merge: 93d7f775 9d3ae58b Author: 维章 <[email protected]> AuthorDate: Mon May 23 15:32:38 2022 +0800 merge from snapshot-1.0.3 NOTICE | 2 +- README.md | 16 +- SUMMARY.md | 7 + docs/README.md | 142 ------ docs/SUMMARY.md | 8 - ...225\264\344\275\223\346\236\266\346\236\204.md" | 33 -- .../2.\346\236\204\345\273\272DataStream.md" | 73 ---- .../3.\345\220\257\345\212\250DataStream.md" | 53 --- ...265\201\350\275\254\350\277\207\347\250\213.md" | 63 --- ...256\227\345\255\220\350\247\243\346\236\220.md" | 55 --- ...256\236\347\216\260\345\256\271\351\224\231.md" | 0 "docs/images/Pipeline\347\261\273\345\233\276.png" | Bin 44207 -> 0 bytes docs/images/img.png | Bin 0 -> 38684 bytes docs/images/img_1.png | Bin 0 -> 43711 bytes docs/images/img_2.png | Bin 0 -> 103151 bytes docs/images/window.png | Bin 241692 -> 0 bytes ...75\223\346\236\266\346\236\204\345\233\276.png" | Bin 60493 -> 0 bytes ...00\273\344\275\223\350\277\207\347\250\213.png" | Bin 44252 -> 0 bytes .../\346\211\251\345\256\271\345\211\215.png" | Bin 56733 -> 0 bytes ...12\266\346\200\201\347\256\227\345\255\220.png" | Bin 35766 -> 0 bytes "docs/images/\347\212\266\346\200\201.png" | Bin 47527 -> 0 bytes "docs/images/\347\274\251\345\256\271.png" | Bin 51087 -> 0 bytes docs/quick_start/README.md | 46 -- pom.xml | 118 ++--- quick_start.md | 92 ++-- .../pom.xml | 41 +- rocketmq-streams-cep/src/test/resources/log4j.xml | 36 ++ rocketmq-streams-channel-db/pom.xml | 4 +- .../streams/db/sink/AbstractMultiTableSink.java | 12 +- .../streams/db/sink/DynamicMultipleDBSink.java | 11 +- .../streams/db/sink/SelfMultiTableSink.java | 2 +- .../streams/db/sink/SplitBySerialNumber.java | 2 +- .../streams/db/sink/SplitByTimeMultiTableSink.java | 2 +- rocketmq-streams-channel-es/pom.xml | 27 +- .../{ESSinkBuilder.java => ESChannelBuilder.java} | 54 +-- .../rocketmq/streams/es/sink/ESSinkBuilder.java | 1 + .../streams/es/sink/ESSinkOnlyChannel.java | 43 +- .../apache/rocketmq/streams/es/sink/EsClient.java | 135 ++++++ rocketmq-streams-channel-http/pom.xml | 4 +- rocketmq-streams-channel-kafka/pom.xml | 32 ++ .../streams/kafka/KafkaChannelBuilder.java | 52 ++- .../apache/rocketmq/streams/kafka/KafkaSplit.java | 55 +++ .../rocketmq/streams/kafka/sink/KafkaSink.java | 200 +++++++++ .../rocketmq/streams/kafka/source/KafkaSource.java | 238 ++++++++++ .../rocketmq/streams/kafka/KafkaChannelTest.java | 104 +++++ .../src/test/resources/log4j.xml | 20 + rocketmq-streams-channel-mqtt/pom.xml | 20 +- .../rocketmq/streams/mqtt/source/PahoSource.java | 41 +- rocketmq-streams-channel-rocketmq/pom.xml | 28 +- .../apache/rocketmq/streams/debug/DebugWriter.java | 92 ++-- .../apache/rocketmq/streams/sink/RocketMQSink.java | 55 ++- .../rocketmq/streams/RocketMQChannelTest.java | 2 +- rocketmq-streams-channel-syslog/pom.xml | 19 +- .../rocketmq/streams/syslog/SyslogChannel.java | 35 +- .../streams/syslog/SyslogChannelBuilder.java | 4 + .../streams/syslog/SyslogChannelManager.java | 6 +- .../rocketmq/streams/syslog/SyslogServer.java | 35 +- .../rocketmq/streams/syslog/SyslogClient.java | 22 +- rocketmq-streams-clients/pom.xml | 14 +- .../streams/client/source/DataStreamSource.java | 19 + .../streams/client/transform/DataStream.java | 41 +- .../streams/client/transform/JoinStream.java | 5 +- .../streams/client/transform/SplitStream.java | 24 +- .../streams/client/transform/WindowStream.java | 50 ++- .../rocketmq/streams/client/ApplicationTest.java | 57 +++ .../rocketmq/streams/client/MqttSourceExample.java | 80 ++++ .../{sink/UserDefinedSink.java => ScriptTest.java} | 27 +- .../apache/rocketmq/streams/client/WindowTest.java | 14 +- .../rocketmq/streams/client/example/JoinTest.java | 7 +- .../rocketmq/streams/client/example/SplitTest.java | 31 +- .../streams/client/sink/UserDefinedSink.java | 2 +- .../client/sink/UserDefinedSupportShuffleSink.java | 4 +- rocketmq-streams-commons/pom.xml | 46 +- .../MappedByteBufferTableWithPrimaryIndex.java | 482 +++++++++++++++++++++ .../streams/common/cache/compress/KVAddress.java | 48 +- .../streams/common/channel/AbstractChannel.java | 14 +- .../AbstractSupportShuffleChannelBuilder.java | 2 +- .../common/channel/builder/IChannelBuilder.java | 25 +- .../channel/builder/IShuffleChannelBuilder.java | 4 +- .../common/channel/impl/CollectionSink.java | 39 +- .../common/channel/impl/CollectionSinkBuilder.java | 34 +- .../common/channel/impl/PrintChannelBuilder.java | 30 +- .../channel/impl/file/FileChannelBuilder.java | 45 +- .../streams/common/channel/impl/file/FileSink.java | 4 +- .../common/channel/impl/file/FileSource.java | 2 +- .../channel/impl/memory/MemoryChannelBuilder.java | 64 +++ .../common/channel/impl/memory/MemorySink.java | 4 +- .../channel/impl/view/ViewChannelBuilder.java | 43 +- .../streams/common/channel/impl/view/ViewSink.java | 31 +- .../common/channel/impl/view/ViewSource.java | 66 +++ .../streams/common/channel/sink/AbstractSink.java | 17 +- .../channel/sink/AbstractSupportShuffleSink.java | 8 +- .../sink/AbstractSupportShuffleUDFSink.java | 6 +- .../common/channel/sink/AbstractUDFSink.java | 23 +- .../streams/common/channel/sink/ISink.java | 9 +- .../impl/AbstractMultiSplitMessageCache.java | 22 +- .../common/channel/source/AbstractSource.java | 89 +++- .../channel/source/AbstractUnreliableSource.java | 2 +- .../streams/common/channel/split/ISplit.java | 5 +- .../streams/common/component/ComponentCreator.java | 4 + .../common/configurable/AbstractConfigurable.java | 17 +- .../common/configurable/BasedConfigurable.java | 13 +- .../common/configurable/IConfigurableService.java | 5 +- .../streams/common/configure/ConfigureFileKey.java | 7 +- .../streams/common/context/AbstractContext.java | 33 +- .../streams/common/context/MessageHeader.java | 89 ++-- .../streams/common/datatype/IntDataType.java | 3 +- .../streams/common/datatype/ShortDataType.java | 2 +- .../streams/common/interfaces/ISerialize.java | 9 +- .../rocketmq/streams/common/model/NameCreator.java | 14 +- .../NameCreatorContext.java} | 36 +- .../common/monitor/ConsoleMonitorManager.java | 412 ++++++++++++++++++ .../streams/common/monitor/DataSyncConstants.java | 54 +++ .../streams/common/monitor/HttpClient.java | 116 +++++ .../rocketmq/streams/common/monitor/HttpUtil.java | 248 +++++++++++ .../monitor/MonitorDataSyncServiceFactory.java | 61 +++ .../common/monitor/group/MonitorCommander.java | 4 +- .../streams/common/monitor/impl/DipperMonitor.java | 2 +- .../streams/common/monitor/model/JobStage.java | 350 +++++++++++++++ .../streams/common/monitor/model/TraceIdsDO.java | 126 ++++++ .../common/monitor/model/TraceMonitorDO.java | 250 +++++++++++ .../service/MonitorDataSyncService.java} | 16 +- .../service/impl/DBMonitorDataSyncImpl.java | 63 +++ .../service/impl/HttpMonitorDataSyncImpl.java | 151 +++++++ .../service/impl/RocketMQMonitorDataSyncImpl.java | 185 ++++++++ .../optimization/IHomologousOptimization.java | 2 +- .../common/optimization/MessageGlobleTrace.java | 16 +- .../streams/common/optimization/Re2Engine.java | 47 +- .../common/optimization/TaskOptimization.java | 17 +- .../optimization/fingerprint/FingerprintCache.java | 4 +- .../optimization/fingerprint/PreFingerprint.java | 54 ++- .../streams/common/schedule/ScheduleTask.java | 4 +- .../common/threadpool/ThreadPoolFactory.java | 34 +- .../AbstractMutilPipelineChainPipline.java | 81 ++-- .../streams/common/topology/ChainPipeline.java | 255 ++++++----- .../streams/common/topology/ChainStage.java | 32 +- .../common/topology/builder/PipelineBuilder.java | 139 ++++-- .../common/topology/metric/NotFireReason.java | 176 ++++++++ .../streams/common/topology/metric/StageGroup.java | 248 +++++++++++ .../common/topology/metric/StageMetric.java | 138 ++++++ .../common/topology/model/AbstractStage.java | 177 +++++--- .../streams/common/topology/model/IWindow.java | 3 +- .../streams/common/topology/model/Pipeline.java | 56 ++- .../topology/model/PipelineSourceJoiner.java | 48 -- .../topology/shuffle/IShuffleKeyGenerator.java | 11 +- .../common/topology/shuffle/ShuffleMQCreator.java | 398 ++++++++++------- .../topology/stages/AbstractWindowStage.java | 5 +- .../stages/EmptyChainStage.java} | 27 +- .../common/topology/stages/FilterChainStage.java | 157 ++----- .../common/topology/stages/JoinChainStage.java | 3 +- .../common/topology/stages/JoinEndChainStage.java | 11 +- .../topology/stages/JoinStartChainStage.java | 67 +++ .../common/topology/stages/OutputChainStage.java | 81 ++-- .../topology/stages/ShuffleConsumerChainStage.java | 193 +++++++++ .../topology/stages/ShuffleProducerChainStage.java | 345 +++++++++++++++ .../topology/stages/SubPiplineChainStage.java | 138 ------ .../common/topology/stages/UnionChainStage.java | 3 +- .../common/topology/stages/UnionEndChainStage.java | 11 +- ...onChainStage.java => UnionStartChainStage.java} | 43 +- .../ViewChainStage.java} | 479 ++++++++++---------- .../common/topology/stages/udf/UDFChainStage.java | 27 +- .../streams/common/topology/task/StreamsTask.java | 444 +++---------------- .../streams/common/topology/task/TaskAssigner.java | 20 +- .../streams/common/utils/ConfigurableUtil.java | 6 +- .../streams/common/utils/ContantsUtil.java | 33 +- .../streams/common/utils/DataTypeUtil.java | 16 +- .../rocketmq/streams/common/utils/FileUtil.java | 89 +++- .../streams/common/utils/InstantiationUtil.java | 48 +- .../streams/common/utils/JsonableUtil.java | 5 + .../rocketmq/streams/common/utils/KryoUtil.java | 214 +++++++++ .../streams/common/utils/NameCreatorUtil.java | 60 --- .../streams/common/utils/PipelineHTMLUtil.java | 299 +++++++++++++ .../streams/common/utils/PropertiesUtils.java | 2 +- .../rocketmq/streams/common/utils/ReflectUtil.java | 62 ++- .../streams/common/utils/SerializeUtil.java | 23 +- .../streams/common/utils/ServiceLoadUtil.java | 63 +++ .../rocketmq/streams/common/utils/TraceUtil.java | 23 +- .../rocketmq/streams/common/channel/SinkTest.java | 4 +- rocketmq-streams-configurable/pom.xml | 9 +- .../configurable/ConfigurableComponent.java | 17 +- .../service/AbstractConfigurableService.java | 98 ++--- .../service/impl}/FileConfigureService.java | 2 +- .../impl}/FileSupportParentConfigureService.java | 2 +- .../service/impl/HttpConfigureService.java | 377 ++++++++++++++++ .../impl/HttpSupportParentConfigureService.java | 20 +- .../service/impl}/MemoryConfigureService.java | 2 +- .../impl}/MemorySupportParentConfigureService.java | 2 +- rocketmq-streams-connectors/pom.xml | 9 +- .../{IBounded.java => IScheduleCallback.java} | 9 +- .../connectors/source/AbstractPullSource.java | 148 ++++--- .../connectors/source/MutilBatchTaskSource.java | 158 +++++++ rocketmq-streams-db-operator/pom.xml | 4 +- .../streams/db/configuable/DBConfigureService.java | 11 +- rocketmq-streams-dim/pom.xml | 4 +- .../intelligence/AbstractIntelligenceCache.java | 5 +- rocketmq-streams-examples/pom.xml | 20 +- .../mutilconsumer/MultiStreamsExample.java | 3 + .../streams/examples/send/ProducerFromFile.java | 8 +- .../streams/examples/source/FileSourceExample.java | 2 +- .../src/main/resources/joinData-2.txt | 4 - rocketmq-streams-filter/pom.xml | 3 +- .../streams/filter/builder/ExpressionBuilder.java | 49 +-- .../streams/filter/context/RuleContext.java | 31 +- .../filter/engine/impl/DefaultRuleEngine.java | 29 +- .../function/expression/CompareFunction.java | 16 +- .../rocketmq/streams/filter/operator/Rule.java | 58 +++ .../expression/ExpressionRelationParser.java | 9 +- .../expression/ExpressionRelationPaser.java | 107 ----- .../operator/expression/GroupExpression.java | 3 +- .../operator/expression/RelationExpression.java | 35 +- .../PiplineLogFingerprintAnalysis.java | 6 +- .../dependency/BlinkRuleV2Expression.java | 5 +- .../optimization/dependency/DependencyTree.java | 22 +- .../dependency/SimplePipelineTree.java | 3 +- .../dependency/StateLessDependencyTree.java | 84 ++++ .../optimization/homologous/HomologousCompute.java | 5 +- .../homologous/HomologousOptimization.java | 13 +- rocketmq-streams-lease/pom.xml | 4 +- rocketmq-streams-runner/assembly/distribution.xml | 69 --- rocketmq-streams-runner/assembly/standalone.xml | 72 --- rocketmq-streams-runner/bin/start.sh | 58 --- rocketmq-streams-runner/bin/stop.sh | 33 -- rocketmq-streams-runner/pom.xml | 80 ---- .../src/main/resources/log4j.xml | 51 --- rocketmq-streams-schedule/pom.xml | 4 +- .../schedule/job/ConfigurableExecutorJob.java | 30 +- rocketmq-streams-script/pom.xml | 4 +- .../function/aggregation/LastValueAccumulator.java | 67 +++ .../function/impl/distinct/DistinctFunction.java | 1 - .../function/impl/json/JsonCreatorFunction.java | 4 +- .../function/impl/json/UDTFFieldNameFunction.java | 50 +++ .../script/function/impl/parser/GrokFunction.java | 4 +- .../function/impl/parser/Paser2JsonFunction.java | 17 +- .../function/impl/parser/PaserBySplitFunction.java | 44 +- .../function/impl/parser/RegexParserFunction.java | 24 +- .../script/function/impl/router/RouteFunction.java | 4 +- .../script/operator/impl/AggregationScript.java | 19 +- .../streams/script/service/IAccumulator.java | 4 +- .../script/service/udf/SimpleUDAFScript.java | 35 +- .../streams/script/service/udf/UDFScript.java | 26 +- .../streams/script/function/FunctionTest.java | 19 +- rocketmq-streams-serviceloader/pom.xml | 4 +- rocketmq-streams-state/pom.xml | 4 +- .../streams/state/kv/rocksdb/RocksDBOperator.java | 37 +- rocketmq-streams-transport-minio/pom.xml | 4 +- rocketmq-streams-window/pom.xml | 9 +- .../window/minibatch/MiniBatchMsgCache.java | 58 +++ .../window/minibatch/ShuffleMessageCache.java | 187 ++++++++ .../rocketmq/streams/window/model/WindowCache.java | 182 +++----- .../streams/window/model/WindowInstance.java | 22 +- .../window/operator/AbstractShuffleWindow.java | 18 +- .../streams/window/operator/AbstractWindow.java | 73 +++- .../window/operator/impl/SessionOperator.java | 2 +- .../window/operator/impl/WindowOperator.java | 8 +- .../streams/window/operator/join/JoinWindow.java | 3 +- .../window/shuffle/AbstractSystemChannel.java | 94 ++-- .../streams/window/shuffle/ShuffleCache.java | 14 +- .../streams/window/shuffle/ShuffleChannel.java | 54 +-- .../streams/window/state/impl/WindowValue.java | 54 ++- .../streams/window/trigger/WindowTrigger.java | 1 - .../rocketmq/streams/window/util/ShuffleUtil.java | 62 +++ .../org/apache/rocketmq/streams/RocksdbTest.java | 2 +- docs/stream_sink/README.md => stream_sink.md | 19 +- docs/stream_source/README.md => stream_source.md | 30 +- .../README.md => stream_transform.md | 0 265 files changed, 9865 insertions(+), 4083 deletions(-) diff --cc pom.xml index 70288c26,7caa0cbc..2d139613 --- a/pom.xml +++ b/pom.xml @@@ -184,9 -137,9 +136,10 @@@ <exclude>**/*.xml</exclude> <exclude>**/*.sh</exclude> <exclude>**/*.out</exclude> + <exclude>**/*.sql</exclude> <exclude>**/*.properties</exclude> <exclude>docs/**/*</exclude> + <exclude>**/*.sql</exclude> </excludes> </configuration> </plugin> diff --cc rocketmq-streams-channel-es/pom.xml index 15723b3c,fe50aeef..b4ce4d85 --- a/rocketmq-streams-channel-es/pom.xml +++ b/rocketmq-streams-channel-es/pom.xml @@@ -20,8 -7,8 +7,8 @@@ <parent> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-streams</artifactId> - <version>1.0.2-SNAPSHOT</version> + <version>1.0.2-preview-SNAPSHOT</version> - </parent> + </parent> <artifactId>rocketmq-streams-channel-es</artifactId> <name>ROCKETMQ STREAMS :: channel-es</name> <packaging>jar</packaging> diff --cc rocketmq-streams-channel-kafka/pom.xml index 00000000,89bd645c..76b73909 mode 000000,100644..100644 --- a/rocketmq-streams-channel-kafka/pom.xml +++ b/rocketmq-streams-channel-kafka/pom.xml @@@ -1,0 -1,32 +1,32 @@@ + <?xml version="1.0" encoding="UTF-8"?> + <project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>rocketmq-streams</artifactId> + <groupId>org.apache.rocketmq</groupId> - <version>1.0.2-SNAPSHOT</version> ++ <version>1.0.2-preview-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>rocketmq-streams-channel-kafka</artifactId> + <name>ROCKETMQ STREAMS :: channel-kafka</name> + + <properties> + <maven.compiler.source>8</maven.compiler.source> + <maven.compiler.target>8</maven.compiler.target> + </properties> + + + <dependencies> + <dependency> + <groupId>org.apache.kafka</groupId> + <artifactId>kafka_2.12</artifactId> + </dependency> + + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-streams-commons</artifactId> + </dependency> + </dependencies> + </project> diff --cc rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java index bb6e9454,d825603a..1de92e31 --- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java +++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java @@@ -17,8 -17,10 +17,11 @@@ package org.apache.rocketmq.streams.sink; + import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Collections; + import java.util.Comparator; + import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@@ -27,8 -29,10 +30,9 @@@ import java.util.Set import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer; -import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; + import org.apache.rocketmq.common.MixAll; import org.apache.rocketmq.common.TopicConfig; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.common.message.MessageQueue; @@@ -149,10 -154,10 +154,6 @@@ public class RocketMQSink extends Abstr destroy(); producer = new DefaultMQProducer(groupName + "producer", true, null); try { -- //please not use the code,the name srv addr may be empty in jmenv --// if (this.namesrvAddr == null || "".equals(this.namesrvAddr)) { --// throw new RuntimeException("namesrvAddr can not be null."); --// } if (StringUtil.isNotEmpty(this.namesrvAddr)) { producer.setNamesrvAddr(this.namesrvAddr); @@@ -263,7 -275,7 +272,7 @@@ @Override public int getSplitNum() { - List<ISplit> splits = getSplitList(); - List<ISplit<?, ?>> splits = getSplitList(); ++ List<ISplit<?,?>> splits = getSplitList(); if (splits == null || splits.size() == 0) { return 0; } diff --cc rocketmq-streams-clients/pom.xml index 8fb31672,94c7f3e7..2d15efce --- a/rocketmq-streams-clients/pom.xml +++ b/rocketmq-streams-clients/pom.xml @@@ -50,6 -57,18 +57,11 @@@ <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-streams-filter</artifactId> </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-streams-channel-syslog</artifactId> + </dependency> - <dependency> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-streams-channel-kafka</artifactId> - </dependency> - <dependency> - <groupId>org.apache.rocketmq</groupId> - <artifactId>rocketmq-streams-dbinit</artifactId> - </dependency> ++ <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-streams-window</artifactId> diff --cc rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java index 9422519a,5004f230..1e4b15b6 --- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java +++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/source/DataStreamSource.java @@@ -178,6 -179,25 +178,25 @@@ public class DataStreamSource return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, null); } - public DataStream fromKafka(String endpoint, String topic, String groupName) { - return fromKafka(endpoint, topic, groupName, true); - } - - public DataStream fromKafka(String endpoint, String topic, String groupName, Boolean isJson) { - return fromKafka(endpoint, topic, groupName, isJson, 1); - } - - public DataStream fromKafka(String endpoint, String topic, String groupName, Boolean isJson, int maxThread) { - KafkaSource kafkaChannel = new KafkaSource(); - kafkaChannel.setBootstrapServers(endpoint); - kafkaChannel.setTopic(topic); - kafkaChannel.setGroupName(groupName); - kafkaChannel.setJsonData(isJson); - kafkaChannel.setMaxThread(maxThread); - this.mainPipelineBuilder.setSource(kafkaChannel); - return new DataStream(this.mainPipelineBuilder, null); - } ++// public DataStream fromKafka(String endpoint, String topic, String groupName) { ++// return fromKafka(endpoint, topic, groupName, true); ++// } ++// ++// public DataStream fromKafka(String endpoint, String topic, String groupName, Boolean isJson) { ++// return fromKafka(endpoint, topic, groupName, isJson, 1); ++// } ++// ++// public DataStream fromKafka(String endpoint, String topic, String groupName, Boolean isJson, int maxThread) { ++// KafkaSource kafkaChannel = new KafkaSource(); ++// kafkaChannel.setBootstrapServers(endpoint); ++// kafkaChannel.setTopic(topic); ++// kafkaChannel.setGroupName(groupName); ++// kafkaChannel.setJsonData(isJson); ++// kafkaChannel.setMaxThread(maxThread); ++// this.mainPipelineBuilder.setSource(kafkaChannel); ++// return new DataStream(this.mainPipelineBuilder, null); ++// } + public DataStream from(ISource<?> source) { this.mainPipelineBuilder.setSource(source); return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, null); diff --cc rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java index 80ae510e,96e24a0a..563af478 --- a/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java +++ b/rocketmq-streams-clients/src/main/java/org/apache/rocketmq/streams/client/transform/DataStream.java @@@ -51,8 -51,11 +51,10 @@@ import org.apache.rocketmq.streams.comm import org.apache.rocketmq.streams.common.topology.ChainStage; import org.apache.rocketmq.streams.common.topology.builder.IStageBuilder; import org.apache.rocketmq.streams.common.topology.builder.PipelineBuilder; -import org.apache.rocketmq.streams.common.topology.model.AbstractRule; import org.apache.rocketmq.streams.common.topology.model.Union; import org.apache.rocketmq.streams.common.topology.stages.FilterChainStage; + import org.apache.rocketmq.streams.common.topology.stages.ShuffleConsumerChainStage; + import org.apache.rocketmq.streams.common.topology.stages.ShuffleProducerChainStage; import org.apache.rocketmq.streams.common.topology.stages.udf.StageBuilder; import org.apache.rocketmq.streams.common.topology.stages.udf.UDFChainStage; import org.apache.rocketmq.streams.common.topology.stages.udf.UDFUnionChainStage; @@@ -646,6 -669,13 +667,13 @@@ public class DataStream implements Seri return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, output); } - public DataStream toKafka(String bootstrapServers, String topic) { - KafkaSink kafkaSink = new KafkaSink(bootstrapServers, topic); - ChainStage<?> output = this.mainPipelineBuilder.createStage(kafkaSink); - this.mainPipelineBuilder.setTopologyStages(currentChainStage, output); - return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, output); - } ++// public DataStream toKafka(String bootstrapServers, String topic) { ++// KafkaSink kafkaSink = new KafkaSink(bootstrapServers, topic); ++// ChainStage<?> output = this.mainPipelineBuilder.createStage(kafkaSink); ++// this.mainPipelineBuilder.setTopologyStages(currentChainStage, output); ++// return new DataStream(this.mainPipelineBuilder, this.otherPipelineBuilders, output); ++// } + public DataStream toEnhanceDBSink(String url, String userName, String password, String tableName) { EnhanceDBSink sink = new EnhanceDBSink(url, userName, password, tableName); ChainStage<?> output = this.mainPipelineBuilder.createStage(sink); diff --cc rocketmq-streams-commons/pom.xml index 5ea155d4,72af956e..2e73c6f8 --- a/rocketmq-streams-commons/pom.xml +++ b/rocketmq-streams-commons/pom.xml @@@ -98,11 -101,36 +101,40 @@@ <artifactId>re2j</artifactId> <version>1.6</version> </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-tools</artifactId> + </dependency> + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-client</artifactId> + <exclusions> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-classic</artifactId> + </exclusion> + <exclusion> + <groupId>ch.qos.logback</groupId> + <artifactId>logback-core</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>de.ruedigermoeller</groupId> + <artifactId>fst</artifactId> + </dependency> + <dependency> + <groupId>com.esotericsoftware</groupId> + <artifactId>kryo</artifactId> + <version>5.3.0</version> + </dependency> + + <dependency> + <groupId>commons-codec</groupId> + <artifactId>commons-codec</artifactId> + <version>1.13</version> + </dependency> </dependencies> </project> diff --cc rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java index 939b1d54,320cbd12..2bc7e55f --- a/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java +++ b/rocketmq-streams-commons/src/main/java/org/apache/rocketmq/streams/common/channel/source/AbstractSource.java @@@ -18,9 -18,10 +18,11 @@@ package org.apache.rocketmq.streams.com import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; + import java.io.UnsupportedEncodingException; import java.util.ArrayList; + import java.util.Arrays; + import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@@ -58,15 -58,22 +60,23 @@@ import org.apache.rocketmq.streams.comm public abstract class AbstractSource extends BasedConfigurable implements ISource<AbstractSource>, ILifeCycle { public static String CHARSET = "UTF-8"; + /** + * 输入的消息是否为json + */ + protected Boolean isJsonData = true; + /** + * 输入的消息是否为json array + */ + protected Boolean msgIsJsonArray = false; - protected Boolean isJsonData = true;//输入的消息是否为json - protected Boolean msgIsJsonArray = false;//输入的消息是否为json array @ENVDependence - protected String groupName;//group name + protected String groupName; + protected int maxThread = Runtime.getRuntime().availableProcessors(); + @ENVDependence protected String topic = ""; + protected String namesrvAddr; /** * 多长时间做一次checkpoint */ @@@ -136,11 -167,9 +170,10 @@@ * @param message * @return */ - public AbstractContext doReceiveMessage(JSONObject message, boolean needSetCheckPoint, String queueId, String offset) { + public AbstractContext doReceiveMessage(JSONObject message, boolean needSetCheckPoint, String queueId, + String offset) { Message msg = createMessage(message, queueId, offset, needSetCheckPoint); - AbstractContext context = executeMessage(msg); - return context; + return executeMessage(msg); } /** diff --cc rocketmq-streams-connectors/pom.xml index 8c8277e3,d544e3d5..e557a19a mode 100755,100644..100644 --- a/rocketmq-streams-connectors/pom.xml +++ b/rocketmq-streams-connectors/pom.xml diff --cc rocketmq-streams-examples/pom.xml index 62a4038c,da6a130b..7ced0cb0 --- a/rocketmq-streams-examples/pom.xml +++ b/rocketmq-streams-examples/pom.xml @@@ -39,6 -41,6 +41,22 @@@ <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-streams-clients</artifactId> </dependency> ++ <dependency> ++ <groupId>com.alibaba</groupId> ++ <artifactId>fastjson</artifactId> ++ </dependency> ++ <dependency> ++ <groupId>junit</groupId> ++ <artifactId>junit</artifactId> ++ </dependency> ++ <dependency> ++ <groupId>org.apache.rocketmq</groupId> ++ <artifactId>rocketmq-client</artifactId> ++ </dependency> ++ <dependency> ++ <groupId>org.apache.rocketmq</groupId> ++ <artifactId>rocketmq-streams-commons</artifactId> ++ </dependency> </dependencies> <packaging>jar</packaging> diff --cc rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MultiStreamsExample.java index f5f3d46c,2b823e44..8835e943 --- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MultiStreamsExample.java +++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/mutilconsumer/MultiStreamsExample.java @@@ -24,13 -23,11 +24,15 @@@ import com.alibaba.fastjson.JSONObject import java.util.Random; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; + ++import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.streams.client.StreamBuilder; import org.apache.rocketmq.streams.client.source.DataStreamSource; import org.apache.rocketmq.streams.client.strategy.WindowStrategy; ++import org.apache.rocketmq.streams.client.transform.DataStream; import org.apache.rocketmq.streams.client.transform.window.Time; import org.apache.rocketmq.streams.client.transform.window.TumblingWindow; +import org.apache.rocketmq.streams.examples.send.ProducerFromFile; import static org.apache.rocketmq.streams.examples.aggregate.Constant.NAMESRV_ADDRESS; import static org.apache.rocketmq.streams.examples.aggregate.Constant.RMQ_CONSUMER_GROUP_NAME; @@@ -64,6 -61,6 +66,7 @@@ public class MultiStreamsExample private static void runOneStreamsClient(int index) { DataStreamSource source = StreamBuilder.dataStream("namespace" + index, "pipeline" + index); ++ source.fromRocketmq( RMQ_TOPIC, RMQ_CONSUMER_GROUP_NAME, diff --cc rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/send/ProducerFromFile.java index 163d8116,58d3710c..0508c303 --- a/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/send/ProducerFromFile.java +++ b/rocketmq-streams-examples/src/main/java/org/apache/rocketmq/streams/examples/send/ProducerFromFile.java @@@ -17,8 -17,8 +17,12 @@@ * */ -package org.apache.rocketmq.streams.examples.aggregate; +package org.apache.rocketmq.streams.examples.send; + ++import org.apache.rocketmq.client.producer.DefaultMQProducer; ++import org.apache.rocketmq.common.message.Message; ++import org.apache.rocketmq.remoting.common.RemotingHelper; + import java.io.BufferedReader; import java.io.File; import java.io.FileReader; @@@ -26,44 -26,18 +30,40 @@@ import java.io.IOException import java.net.URL; import java.util.ArrayList; import java.util.List; -import org.apache.rocketmq.client.producer.DefaultMQProducer; -import org.apache.rocketmq.client.producer.SendResult; -import org.apache.rocketmq.common.message.Message; -import org.apache.rocketmq.remoting.common.RemotingHelper; +import java.util.concurrent.atomic.AtomicLong; - import org.apache.rocketmq.client.producer.DefaultMQProducer; - import org.apache.rocketmq.common.message.Message; - import org.apache.rocketmq.remoting.common.RemotingHelper; - public class ProducerFromFile { + private static final DefaultMQProducer producer = new DefaultMQProducer("test-group"); + private static final AtomicLong count = new AtomicLong(0); + private static boolean init = false; - public static void produce(String filePath, String nameServ, String topic) { - try { - DefaultMQProducer producer = new DefaultMQProducer("test-group"); + private static synchronized void initProducer(String nameServ) throws Throwable { + if (!init) { producer.setNamesrvAddr(nameServ); producer.start(); + init = true; + } + } + + public static void produceInLoop(String filePath, String nameServ, String topic, long interval) { + while (true) { + try { + produce(filePath, nameServ, topic, false); + + Thread.sleep(interval); + + if (count.get() % 500 == 0) { + System.out.println("send message num: " + count.get()); + } + } catch (Throwable t) { + t.printStackTrace(); + } + } + } + + public static void produce(String filePath, String nameServ, String topic, boolean shutdown) { + try { + initProducer(nameServ); List<String> result = ProducerFromFile.read(filePath); diff --cc rocketmq-streams-window/pom.xml index 40fcbf94,7a2be2dd..3ce4dac8 --- a/rocketmq-streams-window/pom.xml +++ b/rocketmq-streams-window/pom.xml @@@ -48,10 -50,6 +50,13 @@@ <groupId>org.rocksdb</groupId> <artifactId>rocksdbjni</artifactId> </dependency> - - + <dependency> + <groupId>org.apache.rocketmq</groupId> + <artifactId>rocketmq-tools</artifactId> + </dependency> ++ <dependency> ++ <groupId>org.apache.rocketmq</groupId> ++ <artifactId>rocketmq-streams-commons</artifactId> ++ </dependency> </dependencies> </project> diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/minibatch/MiniBatchMsgCache.java index 00000000,39629cc1..a36309e2 mode 000000,100644..100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/minibatch/MiniBatchMsgCache.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/minibatch/MiniBatchMsgCache.java @@@ -1,0 -1,76 +1,58 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.rocketmq.streams.window.minibatch; + -import com.alibaba.fastjson.JSONArray; -import com.alibaba.fastjson.JSONObject; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; + import org.apache.commons.lang3.tuple.Pair; + import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack; + import org.apache.rocketmq.streams.common.channel.sinkcache.impl.AbstractMultiSplitMessageCache; + import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache; + import org.apache.rocketmq.streams.common.channel.split.ISplit; + import org.apache.rocketmq.streams.common.context.IMessage; -import org.apache.rocketmq.streams.common.context.Message; -import org.apache.rocketmq.streams.common.context.MessageHeader; + import org.apache.rocketmq.streams.common.topology.shuffle.IShuffleKeyGenerator; -import org.apache.rocketmq.streams.common.utils.MapKeyUtil; -import org.apache.rocketmq.streams.common.utils.ReflectUtil; -import org.apache.rocketmq.streams.common.utils.StringUtil; -import org.apache.rocketmq.streams.script.operator.impl.AggregationScript; -import org.apache.rocketmq.streams.window.model.WindowInstance; + import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow; -import org.apache.rocketmq.streams.window.operator.AbstractWindow; -import org.apache.rocketmq.streams.window.state.impl.WindowValue; -import org.apache.rocketmq.streams.window.util.ShuffleUtil; + + public class MiniBatchMsgCache extends AbstractMultiSplitMessageCache<Pair<ISplit,IMessage>> { + public static String SHUFFLE_KEY="shuffle_key"; + + + + protected transient IShuffleKeyGenerator shuffleKeyGenerator; + protected transient AbstractShuffleWindow window; + + + + + public MiniBatchMsgCache( + IMessageFlushCallBack<Pair<ISplit,IMessage>> flushCallBack, IShuffleKeyGenerator shuffleKeyGenerator, + AbstractShuffleWindow window) { + super(flushCallBack); + this.shuffleKeyGenerator=shuffleKeyGenerator; + this.window=window; + } + + + @Override protected String createSplitId(Pair<ISplit, IMessage> msg) { + return msg.getLeft().getQueueId(); + } + + @Override protected MessageCache createMessageCache() { + ShuffleMessageCache messageCache=new ShuffleMessageCache(this.flushCallBack); + messageCache.setWindow(window); + messageCache.setShuffleKeyGenerator(shuffleKeyGenerator); + return messageCache; + } + } diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java index c749bd84,fed18863..4062b1f5 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowInstance.java @@@ -116,6 -122,19 +116,10 @@@ public class WindowInstance extends Ent return windowInstance; } - /** - * 创建window instance的唯一ID - * - * @return - */ + public String createWindowInstanceId() { + return MapKeyUtil.createKey(splitId, windowNameSpace, windowName, windowInstanceName, startTime, endTime); + } + - public String createWindowInstanceIdWithoutSplitid() { - return MapKeyUtil.createKey(windowNameSpace, windowName, windowInstanceName, startTime, endTime); - } - public String createWindowInstanceTriggerId() { return MapKeyUtil.createKey(splitId, windowNameSpace, windowName, windowInstanceName, startTime, endTime, fireTime); } @@@ -162,7 -254,20 +165,18 @@@ * @return * @Param isWindowInstance2DB 如果是秒级窗口,可能windowinstacne不必存表,只在内存保存,可以通过这个标志设置 */ - public static List<WindowInstance> getOrCreateWindowInstance(AbstractWindow window, Long occurTime, - int timeUnitAdjust, String queueId) { + public static List<WindowInstance> getOrCreateWindowInstance(AbstractWindow window, Long occurTime, int timeUnitAdjust, String queueId) { + return getOrCreateWindowInstance(window,occurTime,timeUnitAdjust,queueId,false); + } + /** + * 查询或者创建Window的实例,滑动窗口有可能返回多个,滚动窗口返回一个 + * + * @param window + * @param occurTime + * @return + * @Param isWindowInstance2DB 如果是秒级窗口,可能windowinstacne不必存表,只在内存保存,可以通过这个标志设置 + */ - public static List<WindowInstance> getOrCreateWindowInstance(AbstractWindow window, Long occurTime, - int timeUnitAdjust, String queueId, boolean isCreateOnly) { ++ public static List<WindowInstance> getOrCreateWindowInstance(AbstractWindow window, Long occurTime, int timeUnitAdjust, String queueId, boolean isCreateOnly) { int windowSlideInterval = window.getSlideInterval(); int windowSizeInterval = window.getSizeInterval(); if (windowSlideInterval == 0) { @@@ -246,10 -345,9 +264,10 @@@ } } List<WindowInstance> lostInstanceList = null; + //todo 这里针对lost的都创建一次 lostInstanceList = WindowInstance.createWindowInstances(window, lostWindowTimeList, lostFireList, queueId); instanceList.addAll(lostInstanceList); - if (CollectionUtil.isNotEmpty(lostInstanceList)) { + if (CollectionUtil.isNotEmpty(lostInstanceList)&&!isCreateOnly) { for (WindowInstance windowInstance : instanceList) { List<WindowInstance> emitInstances = createEmitWindowInstance(window, windowInstance); if (emitInstances != null && emitInstances.size() > 0) { diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java index 070a40f0,5338734f..7c78f478 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java @@@ -85,8 -74,7 +100,7 @@@ public abstract class AbstractShuffleWi Set<String> splitIds = new HashSet<>(); splitIds.add(windowInstance.getSplitId()); shuffleChannel.flush(splitIds); - - return fireWindowInstance(windowInstance, windowInstance.getSplitId(), queueId2Offset); + return doFireWindowInstance(windowInstance); } /** diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java index 49d79c5b,8fd07737..eb548aa6 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractWindow.java @@@ -17,6 -17,17 +17,8 @@@ package org.apache.rocketmq.streams.window.operator; import com.alibaba.fastjson.JSONObject; -import java.util.ArrayList; -import java.util.Date; -import java.util.HashMap; -import java.util.Iterator; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; ++ + import javafx.util.Pair; import org.apache.commons.lang3.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@@ -137,8 -164,8 +145,9 @@@ public abstract class AbstractWindow ex protected boolean isLocalStorageOnly = true;//是否只用本地存储,可以提高性能,但不保证可靠性 protected String reduceSerializeValue;//用户自定义的operator的序列化字节数组,做了base64解码 protected transient IReducer reducer; - + protected transient Long maxPartitionNum = 100000000L; + protected String mapFunctionSerializeValue;//用户自定义的operator的序列化字节数组,做了base64解码 + protected transient MapFunction<JSONObject, Pair<WindowInstance, JSONObject>> mapFunction; /** * the computed column and it's process of computing */ @@@ -164,12 -191,23 +173,13 @@@ */ protected transient String WINDOW_NAME; - /** - * 内部使用,定期检查窗口有没有触发 - */ - //protected transient ScheduledExecutorService fireWindowInstanceChecker =new ScheduledThreadPoolExecutor(3); - - // protected transient ExecutorService deleteService = Executors.newSingleThreadExecutor(); protected volatile transient WindowCache windowCache; - protected transient WindowStorage storage; + protected transient IStorage storage; protected transient WindowTrigger windowFireSource; - protected transient SQLCache sqlCache; protected transient EventTimeManager eventTimeManager; + protected transient ISink contextMsgSink; - //create and save window instacne max partitionNum and window max eventTime - protected transient IWindowMaxValueManager windowMaxValueManager; - public AbstractWindow() { setType(IWindow.TYPE); } @@@ -199,8 -242,12 +209,12 @@@ byte[] bytes = Base64Utils.decode(this.reduceSerializeValue); reducer = InstantiationUtil.deserializeObject(bytes); } + if (StringUtil.isNotEmpty(this.mapFunctionSerializeValue)) { + byte[] bytes = Base64Utils.decode(this.mapFunctionSerializeValue); + this.mapFunction = InstantiationUtil.deserializeObject(bytes); + } eventTimeManager = new EventTimeManager(); - windowMaxValueManager = new WindowMaxValueManager(this, sqlCache); + return success; } @@@ -306,11 -347,11 +320,12 @@@ * @param message * @return */ - protected String generateShuffleKey(IMessage message) { + @Override + public String generateShuffleKey(IMessage message) { if (StringUtil.isEmpty(groupByFieldName)) { - return null; + return "globle_window"; } + JSONObject msg = message.getMessageBody(); String[] fieldNames = groupByFieldName.split(";"); String[] values = new String[fieldNames.length]; @@@ -773,13 -845,45 +796,53 @@@ this.maxDelay = maxDelay; } + public Long getMaxPartitionNum() { + return maxPartitionNum; + } + + public void setMaxPartitionNum(Long maxPartitionNum) { + this.maxPartitionNum = maxPartitionNum; + } + public abstract boolean supportBatchMsgFinish(); + + public ISink getContextMsgSink() { + return contextMsgSink; + } + + public String getContextMsgSinkName() { + return contextMsgSinkName; + } + + public void setContextMsgSinkName(String contextMsgSinkName) { + this.contextMsgSinkName = contextMsgSinkName; + } + + public String getMapFunctionSerializeValue() { + return mapFunctionSerializeValue; + } + + public void setMapFunctionSerializeValue(String mapFunctionSerializeValue) { + this.mapFunctionSerializeValue = mapFunctionSerializeValue; + } + + public void saveMsgContext(String queueId,WindowInstance windowInstance, List<IMessage> messages) { + if(this.mapFunction!=null&&this.contextMsgSink!=null){ + if(messages!=null){ + for(IMessage message:messages){ + JSONObject msg=message.getMessageBody(); + try { + msg=this.mapFunction.map(new Pair(windowInstance,msg)); + Message copyMsg=new Message(msg); + copyMsg.getHeader().setQueueId(queueId); + copyMsg.getHeader().setOffset(message.getHeader().getOffset()); + this.contextMsgSink.batchAdd(copyMsg); + } catch (Exception e) { + throw new RuntimeException("save window context msg error ",e); + } + } + this.contextMsgSink.flush(); + } + } + } } diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java index 65ac261b,8da03987..0c10a9bf --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/SessionOperator.java @@@ -515,13 -481,12 +515,13 @@@ public class SessionOperator extends Wi store(lastValueMap, instance, queueId); } + @Override public long incrementAndGetSplitNumber(WindowInstance instance, String shuffleId) { - long number = super.incrementAndGetSplitNumber(instance, shuffleId); - if (number > 900000000) { - this.getWindowMaxValueManager().resetSplitNum(instance, shuffleId); + long numer = super.incrementAndGetSplitNumber(instance, shuffleId); + if (numer > 900000000) { + this.storage.putMaxPartitionNum(shuffleId, instance.getWindowInstanceId(), numer); } - return number; + return numer; } - } + } diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java index 65f326f6,d2806081..de6dce22 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/impl/WindowOperator.java @@@ -16,113 -16,155 +16,119 @@@ */ package org.apache.rocketmq.streams.window.operator.impl; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicLong; -import org.apache.rocketmq.streams.common.channel.split.ISplit; ++ import org.apache.rocketmq.streams.common.context.IMessage; import org.apache.rocketmq.streams.common.context.MessageOffset; -import org.apache.rocketmq.streams.common.utils.CollectionUtil; -import org.apache.rocketmq.streams.common.utils.DateUtil; import org.apache.rocketmq.streams.common.utils.MapKeyUtil; import org.apache.rocketmq.streams.common.utils.StringUtil; -import org.apache.rocketmq.streams.db.driver.batchloader.IRowOperator; -import org.apache.rocketmq.streams.db.driver.orm.ORMUtil; -import org.apache.rocketmq.streams.script.operator.impl.AggregationScript; import org.apache.rocketmq.streams.window.debug.DebugWriter; -import org.apache.rocketmq.streams.window.model.FunctionExecutor; import org.apache.rocketmq.streams.window.model.WindowInstance; import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow; -import org.apache.rocketmq.streams.window.operator.AbstractWindow; -import org.apache.rocketmq.streams.window.sqlcache.impl.FiredNotifySQLElement; import org.apache.rocketmq.streams.window.state.WindowBaseValue; import org.apache.rocketmq.streams.window.state.impl.WindowValue; -import org.apache.rocketmq.streams.window.storage.IWindowStorage; -import org.apache.rocketmq.streams.window.storage.ShufflePartitionManager; -import org.apache.rocketmq.streams.window.storage.WindowStorage.WindowBaseValueIterator; - -public class WindowOperator extends AbstractShuffleWindow { +import org.apache.rocketmq.streams.window.storage.IteratorWrap; +import org.apache.rocketmq.streams.window.storage.RocksdbIterator; +import org.apache.rocketmq.streams.window.storage.WindowType; - import java.util.*; - private static final String ORDER_BY_SPLIT_NUM = "_order_by_split_num_";//key=_order;queueid,windowinstanceid,partitionNum ++import java.util.ArrayList; ++import java.util.Comparator; ++import java.util.HashMap; ++import java.util.List; ++import java.util.Map; +import java.util.Map.Entry; +import java.util.stream.Collectors; +public class WindowOperator extends AbstractShuffleWindow { public WindowOperator() { super(); } - protected transient boolean supportQuickStoreModel=false; - protected transient List<String> schema=new ArrayList<>(); + @Override + public int doFireWindowInstance(WindowInstance instance) { + String windowInstanceId = instance.getWindowInstanceId(); + String queueId = instance.getSplitId(); - @Deprecated - public WindowOperator(String timeFieldName, int windowPeriodMinute) { - super(); - super.timeFieldName = timeFieldName; - super.sizeInterval = windowPeriodMinute; - } + RocksdbIterator<WindowBaseValue> rocksdbIterator = storage.getWindowBaseValue(queueId, windowInstanceId, WindowType.NORMAL_WINDOW, null); - @Deprecated - public WindowOperator(String timeFieldName, int windowPeriodMinute, String calFieldName) { - super(); - super.timeFieldName = timeFieldName; - super.sizeInterval = windowPeriodMinute; - } + ArrayList<WindowValue> windowValues = new ArrayList<>(); + while (rocksdbIterator.hasNext()) { + IteratorWrap<WindowBaseValue> next = rocksdbIterator.next(); + WindowValue data = (WindowValue)next.getData(); + windowValues.add(data); + } + + windowValues.sort(Comparator.comparingLong(WindowBaseValue::getPartitionNum)); - public WindowOperator(int sizeInterval, String groupByFieldName, Map<String, String> select) { - this.sizeInterval = sizeInterval; - this.slideInterval = sizeInterval; - this.groupByFieldName = groupByFieldName; - this.setSelectMap(select); + int fireCount = sendBatch(windowValues, queueId, 0); + + clearFire(instance); + + return fireCount; } - protected transient AtomicInteger shuffleCount = new AtomicInteger(0); - protected transient AtomicInteger fireCountAccumulator = new AtomicInteger(0); - protected transient AtomicLong fireCost=new AtomicLong(0); - @Override - public int fireWindowInstance(WindowInstance instance, String queueId, Map<String, String> queueId2Offset) { - List<WindowValue> windowValues = new ArrayList<>(); - int fireCount = 0; - //long startTime = System.currentTimeMillis(); - //int sendCost = 0; - // int currentCount = 0; - //for(String queueId:currentQueueIds){ - WindowBaseValueIterator<WindowBaseValue> it = storage.loadWindowInstanceSplitData(getOrderBypPrefix(), queueId, instance.createWindowInstanceId(), null, getWindowBaseValueClass()); - if (queueId2Offset != null) { - String offset = queueId2Offset.get(queueId); - if (StringUtil.isNotEmpty(offset)) { - it.setPartitionNum(Long.valueOf(offset)); - } + private int sendBatch(List<WindowValue> windowValues, String queueId, int fireCount) { + if (windowValues == null || windowValues.size() == 0) { + return fireCount; } - while (it.hasNext()) { - WindowBaseValue windowBaseValue = it.next(); - if (windowBaseValue == null) { - continue; - } - WindowValue windowValue = (WindowValue) windowBaseValue; - - Integer currentValue = getValue(windowValue, "total"); - - fireCountAccumulator.addAndGet(currentValue); - windowValues.add((WindowValue) windowBaseValue); - if (windowValues.size() >= windowCache.getBatchSize()) { - long sendFireCost = System.currentTimeMillis(); - sendFireMessage(windowValues, queueId); - //sendCost += (System.currentTimeMillis() - sendFireCost); - fireCount += windowValues.size(); - windowValues = new ArrayList<>(); - } - } - if (windowValues.size() > 0) { - long sendFireCost = System.currentTimeMillis(); + if (windowValues.size() <= windowCache.getBatchSize()) { sendFireMessage(windowValues, queueId); - // sendCost += (System.currentTimeMillis() - sendFireCost); + fireCount += windowValues.size(); + + return fireCount; + } else { + ArrayList<WindowValue> temp = new ArrayList<>(); + for (int i = 0; i < windowCache.getBatchSize(); i++) { + temp.add(windowValues.remove(i)); + } + + sendFireMessage(temp, queueId); + + return sendBatch(windowValues, queueId, fireCount + windowCache.getBatchSize()); } - clearFire(instance); - this.sqlCache.addCache(new FiredNotifySQLElement(queueId, instance.createWindowInstanceId())); - //long cost= this.fireCost.addAndGet(System.currentTimeMillis()-startTime); - // System.out.println("fire cost is "+cost+" "+ DateUtil.getCurrentTimeString()); - return fireCount; } - protected transient Map<String, Integer> shuffleWindowInstanceId2MsgCount = new HashMap<>(); - protected transient int windowvaluecount = 0; - protected transient AtomicLong shuffleCost=new AtomicLong(0); + @Override public void shuffleCalculate(List<IMessage> messages, WindowInstance instance, String queueId) { + Long startTime=System.currentTimeMillis(); DebugWriter.getDebugWriter(getConfigureName()).writeShuffleCalcultateReceveMessage(instance, messages, queueId); + List<String> sortKeys = new ArrayList<>(); Map<String, List<IMessage>> groupBy = groupByGroupName(messages, sortKeys); - Set<String> groupByKeys = groupBy.keySet(); - List<String> storeKeys = new ArrayList<>(); - for (String groupByKey : groupByKeys) { - String storeKey = createStoreKey(queueId, groupByKey, instance); - storeKeys.add(storeKey); + + RocksdbIterator<WindowBaseValue> windowBaseValue = storage.getWindowBaseValue(queueId, instance.getWindowInstanceId(), WindowType.NORMAL_WINDOW, null); + + ArrayList<WindowBaseValue> windowValues = new ArrayList<>(); + while (windowBaseValue.hasNext()) { + IteratorWrap<WindowBaseValue> next = windowBaseValue.next(); + windowValues.add(next.getData()); } - Map<String, WindowBaseValue> allWindowValues = new HashMap<>(); - //从存储中,查找window value对象,value是对象的json格式 - Map<String, WindowBaseValue> existWindowValues = storage.multiGet(getWindowBaseValueClass(), storeKeys, instance.createWindowInstanceId(), queueId); - // Iterator<Entry<String, List<IMessage>>> it = groupBy.entrySet().iterator(); - for (String groupByKey : sortKeys) { + Map<String, List<WindowValue>> temp = windowValues.stream().map((value) -> (WindowValue) value).collect(Collectors.groupingBy(WindowValue::getMsgKey)); + + Map<String, List<WindowValue>> groupByMsgKey = new HashMap<>(temp); + + List<WindowValue> allWindowValues = new ArrayList<>(); + + //处理不同groupBy的message + for (String groupByKey : sortKeys) { List<IMessage> msgs = groupBy.get(groupByKey); String storeKey = createStoreKey(queueId, groupByKey, instance); - WindowValue windowValue = (WindowValue) existWindowValues.get(storeKey); - ; - if (windowValue == null) { - windowvaluecount++; + + //msgKey 为唯一键 + List<WindowValue> windowValueList = groupByMsgKey.get(storeKey); + WindowValue windowValue; + if (windowValueList == null || windowValueList.size() == 0) { windowValue = createWindowValue(queueId, groupByKey, instance); - // windowValue.setOrigOffset(msgs.get(0).getHeader().getOffset()); + } else { + windowValue = windowValueList.get(0); } - allWindowValues.put(storeKey, windowValue); - windowValue.incrementUpdateVersion(); - Integer origValue = getValue(windowValue, "total"); + allWindowValues.add(windowValue); + windowValue.incrementUpdateVersion(); if (msgs != null) { for (IMessage message : msgs) { diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java index 9c1c0af6,5eb3b784..f356a286 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/join/JoinWindow.java @@@ -60,11 -62,16 +60,10 @@@ public class JoinWindow extends Abstrac protected String joinType;//join类型,值为INNER,LEFT protected String expression;//条件表达式。在存在非等值比较时使用 - protected transient DBOperator joinOperator = new DBOperator(); - protected String rightDependentTableName; - // @Override - // protected void addPropertyToMessage(IMessage oriMessage, JSONObject oriJson){ - // oriJson.put("AbstractWindow", this); - // - // } - @Override - protected int fireWindowInstance(WindowInstance instance, String shuffleId, Map<String, String> queueId2Offsets) { + protected int doFireWindowInstance(WindowInstance instance) { + //todo 只是清理吗? clearFire(instance); return 0; } @@@ -359,8 -422,18 +358,8 @@@ } @Override - protected String generateShuffleKey(IMessage message) { + public String generateShuffleKey(IMessage message) { - String routeLabel =null; - String lable = message.getHeader().getMsgRouteFromLable(); - if (lable != null) { - if (lable.equals(rightDependentTableName)) { - routeLabel = MessageHeader.JOIN_RIGHT; - } else { - routeLabel = MessageHeader.JOIN_LEFT; - } - } else { - throw new RuntimeException("can not dipatch message, need route label " + toJson()); - } + String routeLabel = message.getHeader().getMsgRouteFromLable(); String messageKey = generateKey(message.getMessageBody(), routeLabel, leftJoinFieldNames, rightJoinFieldNames); return messageKey; } diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/AbstractSystemChannel.java index 6c4659b3,d0e499f2..0a3845fd --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/AbstractSystemChannel.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/AbstractSystemChannel.java @@@ -54,11 -55,12 +55,12 @@@ public abstract class AbstractSystemCha protected static final String CHANNEL_PROPERTY_KEY_PREFIX = "CHANNEL_PROPERTY_KEY_PREFIX"; protected static final String CHANNEL_TYPE = "CHANNEL_TYPE"; - protected ISource consumer; + protected ISource<?> consumer; protected AbstractSupportShuffleSink producer; protected Map<String, String> channelConfig = new HashMap<>(); - protected volatile boolean hasCreateShuffleChannel = false; + protected boolean hasCreateShuffleChannel = false; + protected transient AtomicBoolean hasStart = new AtomicBoolean(false); public void startChannel() { if (consumer == null) { return; diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleCache.java index 5139f707,054a418d..7b4d8b4f --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleCache.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleCache.java @@@ -22,11 -22,11 +22,11 @@@ import java.util.Collections import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.Future; - import org.apache.commons.lang3.tuple.Pair; + import org.apache.rocketmq.streams.common.channel.sink.AbstractSink; import org.apache.rocketmq.streams.common.context.IMessage; -import org.apache.rocketmq.streams.db.driver.orm.ORMUtil; +import org.apache.rocketmq.streams.common.context.MessageOffset; import org.apache.rocketmq.streams.window.debug.DebugWriter; import org.apache.rocketmq.streams.window.model.WindowCache; import org.apache.rocketmq.streams.window.model.WindowInstance; @@@ -36,9 -37,8 +36,9 @@@ import org.apache.rocketmq.streams.wind /** * save receiver messages into cachefilter when checkpoint/autoflush/flush, process cachefilter message */ - public class ShuffleCache extends WindowCache { + public class ShuffleCache extends AbstractSink { protected AbstractShuffleWindow window; + private HashMap<String, Boolean> hasLoad = new HashMap<>(); public ShuffleCache(AbstractShuffleWindow window) { this.window = window; @@@ -64,64 -54,23 +64,65 @@@ for (Pair<String, String> queueIdAndInstanceKey : keys) { String queueId = queueIdAndInstanceKey.getLeft(); String windowInstanceId = queueIdAndInstanceKey.getRight(); + List<IMessage> messages = instance2Messages.get(queueIdAndInstanceKey); + WindowInstance windowInstance = windowInstanceMap.get(windowInstanceId); + DebugWriter.getDebugWriter(window.getConfigureName()).writeShuffleReceive(window, messages, windowInstance); + + stateMustLoad(queueId); + window.shuffleCalculate(messages, windowInstance, queueId); + + //保存处理进度 saveSplitProgress(queueId, messages); + window.saveMsgContext(queueId,windowInstance,messages); } - return true; + return true; } + private void stateMustLoad(String queueId) { + Boolean load = this.hasLoad.get(queueId); + if (load != null && load) { + return; + } + + //在计算之前需要异步加载状态完成 + HashMap<String, Future<?>> loadResult = this.window.getShuffleChannel().getLoadResult(); + Future<?> future = loadResult.get(queueId); + + if (future == null) { + return; + } + + try { + long before = System.currentTimeMillis(); + future.get(); + long after = System.currentTimeMillis(); + + System.out.println("message wait before state recover:[" + (after - before) + "] ms, queueId=" + queueId); + + for (String loadQueueId : loadResult.keySet()) { + hasLoad.put(loadQueueId, true); + } + } catch (Throwable t) { + throw new RuntimeException("check remote with queueId:" + queueId + ",error", t); + } + } + /** - * save consumer progress(offset)for groupby source queueId + * save consumer progress(offset)for groupby source shuffleId + * window configName: name_window_10001 + * shuffleId: shuffle_NormalTestTopic_namespace_name_broker-a_001 + * oriQueueId: NormalTestTopic2_broker-a_000 * - * @param queueId + * @param shuffleId * @param messages */ - protected void saveSplitProgress(String queueId, List<IMessage> messages) { + protected void saveSplitProgress(String shuffleId, List<IMessage> messages) { + IStorage delegator = this.window.getStorage(); + Map<String, String> queueId2OrigOffset = new HashMap<>(); Boolean isLong = false; for (IMessage message : messages) { diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java index f9bd3413,9280a2e4..d834af41 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java @@@ -18,7 -18,18 +18,8 @@@ package org.apache.rocketmq.streams.win import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.StringUtils; + import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage; @@@ -42,7 -57,10 +46,8 @@@ import org.apache.rocketmq.streams.comm import org.apache.rocketmq.streams.common.utils.MapKeyUtil; import org.apache.rocketmq.streams.common.utils.StringUtil; import org.apache.rocketmq.streams.common.utils.TraceUtil; -import org.apache.rocketmq.streams.db.driver.orm.ORMUtil; import org.apache.rocketmq.streams.window.debug.DebugWriter; + import org.apache.rocketmq.streams.window.minibatch.MiniBatchMsgCache; import org.apache.rocketmq.streams.window.model.WindowCache; import org.apache.rocketmq.streams.window.model.WindowInstance; import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow; @@@ -77,11 -90,17 +85,13 @@@ public class ShuffleChannel extends Abs protected ShuffleCache shuffleCache; - protected Map<String, ISplit> queueMap = new ConcurrentHashMap<>(); - protected List<ISplit> queueList;//所有的分片 + protected Map<String, ISplit<?, ?>> queueMap = new ConcurrentHashMap<>(); + /** + * 所有的分片 + */ + protected List<ISplit<?, ?>> queueList; - // protected NotifyChannel notfiyChannel;//负责做分片的通知管理 protected AbstractShuffleWindow window; - /** - * 当前管理的分片 - */ - private Set<String> currentQueueIds; protected transient boolean isWindowTest = false; @@@ -116,28 -126,16 +117,16 @@@ * init shuffle channel */ public void init() { - this.consumer = createSource(window.getNameSpace(), window.getConfigureName()); - - this.producer = createSink(window.getNameSpace(), window.getConfigureName()); - if (this.consumer == null || this.producer == null) { - autoCreateShuffleChannel(window.getFireReceiver().getPipeline()); - } - if (this.consumer == null) { - return; - } - if (this.consumer instanceof AbstractSource) { - ((AbstractSource) this.consumer).setJsonData(true); - } + init(this.window); if (producer != null && (queueList == null || queueList.size() == 0)) { queueList = producer.getSplitList(); - Map<String, ISplit> tmp = new ConcurrentHashMap<>(); - for (ISplit queue : queueList) { + Map<String, ISplit<?, ?>> tmp = new ConcurrentHashMap<>(); + for (ISplit<?, ?> queue : queueList) { tmp.put(queue.getQueueId(), queue); } - this.queueMap = tmp; } - isWindowTest = ComponentCreator.getPropertyBooleanValue("window.fire.isTest"); +// isWindowTest = ComponentCreator.getPropertyBooleanValue("window.fire.isTest"); } /** @@@ -467,8 -505,11 +455,8 @@@ return splitNum > 0 ? splitNum : 32; } - public Set<String> getCurrentQueueIds() { - return currentQueueIds; - } - public List<ISplit> getQueueList() { + public List<ISplit<?, ?>> getQueueList() { return queueList; } diff --cc rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/trigger/WindowTrigger.java index b012b175,69b6494a..e3bb3f83 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/trigger/WindowTrigger.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/trigger/WindowTrigger.java @@@ -287,7 -287,7 +287,6 @@@ public class WindowTrigger extends Abst } }); for (WindowInstance windowInstance : windowInstanceList) { - System.out.println("fire by finish flag"); - // System.out.println("fire by finish flag"); fireWindowInstance(windowInstance); } }
