This is an automated email from the ASF dual-hosted git repository. karp pushed a commit to branch snapshot-1.0.4 in repository https://gitbox.apache.org/repos/asf/rocketmq-streams.git
commit 24b5bf902a1f1faa15b1eabd0df6c44d0d9f9d07 Author: 维章 <[email protected]> AuthorDate: Mon May 30 15:39:02 2022 +0800 remove mini batch --- .../apache/rocketmq/streams/sink/RocketMQSink.java | 30 ++-- .../window/minibatch/MiniBatchMsgCache.java | 58 ------- .../rocketmq/streams/window/model/WindowCache.java | 182 +++++++++++++-------- .../window/operator/AbstractShuffleWindow.java | 1 - .../streams/window/shuffle/ShuffleChannel.java | 2 - 5 files changed, 128 insertions(+), 145 deletions(-) diff --git a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java index 1de92e31..d702053e 100644 --- a/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java +++ b/rocketmq-streams-channel-rocketmq/src/main/java/org/apache/rocketmq/streams/sink/RocketMQSink.java @@ -241,30 +241,22 @@ public class RocketMQSink extends AbstractSupportShuffleSink { } @Override - public List<ISplit<?, ?>> getSplitList() { + public List<ISplit<?,?>> getSplitList() { initProducer(); - List<ISplit<?, ?>> messageQueues = new ArrayList<>(); - List<MessageQueue> metaqQueueSet = new ArrayList<>(); + List<ISplit<?,?>> messageQueues = new ArrayList<>(); try { - if (messageQueues == null || messageQueues.size() == 0) { - try { - metaqQueueSet = producer.fetchPublishMessageQueues(topic); - }catch (Exception e) { - producer.send(new Message(topic, "test", "test".getBytes(StandardCharsets.UTF_8))); - metaqQueueSet = producer.fetchPublishMessageQueues(topic); - } - List<ISplit<?, ?>> queueList = new ArrayList<>(); - for (MessageQueue queue : metaqQueueSet) { - RocketMQMessageQueue rocketMQMessageQueue = new RocketMQMessageQueue(queue); - queueList.add(rocketMQMessageQueue); + List<MessageQueue> messageQueueSet = producer.fetchPublishMessageQueues(topic); + List<ISplit<?,?>> queueList = new ArrayList<>(); + for (MessageQueue queue : messageQueueSet) { + RocketMQMessageQueue rocketMQMessageQueue = new RocketMQMessageQueue(queue); + queueList.add(rocketMQMessageQueue); - } - queueList.sort((Comparator<ISplit>) Comparable::compareTo); - messageQueues = queueList; } - } catch (Exception e) { - throw new RuntimeException(e); + queueList.sort((Comparator<ISplit>) Comparable::compareTo); + messageQueues = queueList; + } catch (MQClientException e) { + return messageQueues; } return messageQueues; diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/minibatch/MiniBatchMsgCache.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/minibatch/MiniBatchMsgCache.java deleted file mode 100644 index a36309e2..00000000 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/minibatch/MiniBatchMsgCache.java +++ /dev/null @@ -1,58 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.rocketmq.streams.window.minibatch; - -import org.apache.commons.lang3.tuple.Pair; -import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack; -import org.apache.rocketmq.streams.common.channel.sinkcache.impl.AbstractMultiSplitMessageCache; -import org.apache.rocketmq.streams.common.channel.sinkcache.impl.MessageCache; -import org.apache.rocketmq.streams.common.channel.split.ISplit; -import org.apache.rocketmq.streams.common.context.IMessage; -import org.apache.rocketmq.streams.common.topology.shuffle.IShuffleKeyGenerator; -import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow; - -public class MiniBatchMsgCache extends AbstractMultiSplitMessageCache<Pair<ISplit,IMessage>> { - public static String SHUFFLE_KEY="shuffle_key"; - - - - protected transient IShuffleKeyGenerator shuffleKeyGenerator; - protected transient AbstractShuffleWindow window; - - - - - public MiniBatchMsgCache( - IMessageFlushCallBack<Pair<ISplit,IMessage>> flushCallBack, IShuffleKeyGenerator shuffleKeyGenerator, - AbstractShuffleWindow window) { - super(flushCallBack); - this.shuffleKeyGenerator=shuffleKeyGenerator; - this.window=window; - } - - - @Override protected String createSplitId(Pair<ISplit, IMessage> msg) { - return msg.getLeft().getQueueId(); - } - - @Override protected MessageCache createMessageCache() { - ShuffleMessageCache messageCache=new ShuffleMessageCache(this.flushCallBack); - messageCache.setWindow(window); - messageCache.setShuffleKeyGenerator(shuffleKeyGenerator); - return messageCache; - } -} diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java index c53a50ff..8ce1232f 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/model/WindowCache.java @@ -19,9 +19,14 @@ package org.apache.rocketmq.streams.window.model; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.atomic.AtomicLong; + import org.apache.commons.lang3.tuple.MutablePair; import org.apache.commons.lang3.tuple.Pair; import org.apache.commons.logging.Log; @@ -29,30 +34,26 @@ import org.apache.commons.logging.LogFactory; import org.apache.rocketmq.streams.common.batchsystem.BatchFinishMessage; import org.apache.rocketmq.streams.common.channel.sink.AbstractSink; import org.apache.rocketmq.streams.common.channel.sinkcache.IMessageFlushCallBack; +import org.apache.rocketmq.streams.common.channel.sinkcache.impl.AbstractMultiSplitMessageCache; import org.apache.rocketmq.streams.common.channel.split.ISplit; import org.apache.rocketmq.streams.common.component.ComponentCreator; import org.apache.rocketmq.streams.common.context.IMessage; import org.apache.rocketmq.streams.common.context.Message; import org.apache.rocketmq.streams.common.topology.model.IWindow; -import org.apache.rocketmq.streams.common.topology.shuffle.IShuffleKeyGenerator; import org.apache.rocketmq.streams.common.utils.CompressUtil; -import org.apache.rocketmq.streams.window.minibatch.MiniBatchMsgCache; +import org.apache.rocketmq.streams.common.utils.StringUtil; +import org.apache.rocketmq.streams.window.debug.DebugWriter; import org.apache.rocketmq.streams.window.shuffle.ShuffleChannel; -import org.apache.rocketmq.streams.window.util.ShuffleUtil; /** * 缓存数据,flush时,刷新完成数据落盘 */ -public abstract class WindowCache extends - AbstractSink implements IWindow.IWindowCheckpoint { - +public abstract class WindowCache extends AbstractSink implements IWindow.IWindowCheckpoint { private static final Log LOG = LogFactory.getLog(WindowCache.class); - public static final String SPLIT_SIGN = "##"; public static final String IS_COMPRESSION_MSG = "_is_compress_msg"; public static final String COMPRESSION_MSG_DATA = "_compress_msg"; - public static final String MSG_FROM_SOURCE = "msg_from_source"; public static final String ORIGIN_OFFSET = "origin_offset"; public static final String ORIGIN_QUEUE_ID = "origin_queue_id"; @@ -66,94 +67,105 @@ public abstract class WindowCache extends public static final String SHUFFLE_KEY = "SHUFFLE_KEY"; public static final String ORIGIN_MESSAGE_TRACE_ID = "origin_request_id"; + protected transient boolean isWindowTest = false; + protected transient AtomicLong COUNT = new AtomicLong(0); /** * 分片转发channel */ protected transient ShuffleChannel shuffleChannel; - public void initMiniBatch() { - shuffleMsgCache = new MiniBatchMsgCache(new WindowCache.MutilMsgMergerAndCompressFlushCallBack(),(IShuffleKeyGenerator) shuffleChannel.getWindow(),shuffleChannel.getWindow()); - - - shuffleMsgCache.openAutoFlush(); - } - -// protected transient AtomicLong insertCount=new AtomicLong(0); -// protected transient AtomicLong shuffleCount=new AtomicLong(0); -// protected transient AtomicLong SUM=new AtomicLong(0); - - protected class MutilMsgMergerAndCompressFlushCallBack implements IMessageFlushCallBack<Pair<ISplit, IMessage>> { + protected class ShuffleMsgCache extends AbstractMultiSplitMessageCache<Pair<ISplit, JSONObject>> { + + public ShuffleMsgCache() { + super(new IMessageFlushCallBack<Pair<ISplit, JSONObject>>() { + @Override + public boolean flushMessage(List<Pair<ISplit, JSONObject>> messages) { + if (messages == null || messages.size() == 0) { + return true; + } + ISplit split = messages.get(0).getLeft(); + JSONObject jsonObject = messages.get(0).getRight(); + JSONArray allMsgs = shuffleChannel.getMsgs(jsonObject); + for (int i = 1; i < messages.size(); i++) { + Pair<ISplit, JSONObject> pair = messages.get(i); + JSONObject msg = pair.getRight(); + JSONArray jsonArray = shuffleChannel.getMsgs(msg); + if (jsonArray != null) { + allMsgs.addAll(jsonArray); + } + } + JSONObject zipJsonObject = new JSONObject(); + zipJsonObject.put(COMPRESSION_MSG_DATA, CompressUtil.gZip(jsonObject.toJSONString())); + zipJsonObject.put(IS_COMPRESSION_MSG, true); + shuffleChannel.getProducer().batchAdd(new Message(zipJsonObject), split); + shuffleChannel.getProducer().flush(split.getQueueId()); + return true; + } + }); + } @Override - public boolean flushMessage(List<Pair<ISplit, IMessage>> messages) { - if (messages == null || messages.size() == 0) { - return true; - } - long start=System.currentTimeMillis(); - ISplit split = messages.get(0).getLeft(); - JSONArray allMsgs =new JSONArray(); - long sum=0; - for (int i = 0; i < messages.size(); i++) { - Pair<ISplit, IMessage> pair = messages.get(i); - IMessage message = pair.getRight(); - allMsgs.add(message.getMessageBody()); - // sum=SUM.addAndGet(message.getMessageBody().getLong("total")); - } - //System.out.println("before shuffle sum is "+sum); - JSONObject jsonObject=shuffleChannel.createMsg(allMsgs,split); -// JSONObject zipJsonObject = new JSONObject(); -// zipJsonObject.put(COMPRESSION_MSG_DATA, CompressUtil.gZip(jsonObject.toJSONString())); -// zipJsonObject.put(IS_COMPRESSION_MSG, true); - shuffleChannel.getProducer().batchAdd(new Message(jsonObject), split); - shuffleChannel.getProducer().flush(split.getQueueId()); - long cost=System.currentTimeMillis()-start; - // shuffleCount.addAndGet(cost); - return true; + protected String createSplitId(Pair<ISplit, JSONObject> msg) { + return msg.getLeft().getQueueId(); } } - - protected transient MiniBatchMsgCache shuffleMsgCache ; + protected transient ShuffleMsgCache shuffleMsgCache = new ShuffleMsgCache(); @Override protected boolean initConfigurable() { - + shuffleMsgCache = new ShuffleMsgCache(); + shuffleMsgCache.setBatchSize(1000); + shuffleMsgCache.setAutoFlushSize(100); + shuffleMsgCache.setAutoFlushTimeGap(1000); + shuffleMsgCache.openAutoFlush(); isWindowTest = ComponentCreator.getPropertyBooleanValue("window.fire.isTest"); return super.initConfigurable(); } @Override protected boolean batchInsert(List<IMessage> messageList) { - long start=System.currentTimeMillis(); - for (IMessage msg : messageList) { - String shuffleKey = generateShuffleKey(msg); - IMessage message= ShuffleUtil.createShuffleMsg(msg,shuffleKey); - if(message==null){ - continue; + Map<Integer, JSONArray> shuffleMap = translateToShuffleMap(messageList); + if (shuffleMap != null && shuffleMap.size() > 0) { + Set<String> splitIds = new HashSet<>(); + + for (Map.Entry<Integer, JSONArray> entry : shuffleMap.entrySet()) { + ISplit split = shuffleChannel.getSplit(entry.getKey()); + JSONObject msg = shuffleChannel.createMsg(entry.getValue(), split); + + shuffleMsgCache.addCache(new MutablePair<>(split, msg)); + List<IMessage> messages = new ArrayList<>(); + splitIds.add(split.getQueueId()); + + if (DebugWriter.getDebugWriter(shuffleChannel.getWindow().getConfigureName()).isOpenDebug()) { + JSONArray jsonArray = entry.getValue(); + for (int i = 0; i < jsonArray.size(); i++) { + Message message = new Message(jsonArray.getJSONObject(i)); + message.getHeader().setQueueId(jsonArray.getJSONObject(i).getString(ORIGIN_QUEUE_ID)); + message.getHeader().setOffset(jsonArray.getJSONObject(i).getLong(ORIGIN_OFFSET)); + messages.add(message); + + } + DebugWriter.getDebugWriter(shuffleChannel.getWindow().getConfigureName()).writeWindowCache(shuffleChannel.getWindow(), messages, split.getQueueId()); + } + } - addPropertyToMessage(msg, message.getMessageBody()); - Integer index = shuffleChannel.hash(shuffleKey); - ISplit split = shuffleChannel.getSplit(index); - shuffleMsgCache.addCache(new MutablePair(split, message)); + } if (isWindowTest) { long count = COUNT.addAndGet(messageList.size()); System.out.println(shuffleChannel.getWindow().getConfigureName() + " send shuffle msg count is " + count); shuffleMsgCache.flush(); } - long cost=System.currentTimeMillis()-start; - //shuffleCount.addAndGet(cost); return true; } @Override public void finishBatchMsg(BatchFinishMessage batchFinishMessage) { - long start=System.currentTimeMillis(); if (shuffleChannel != null && shuffleChannel.getProducer() != null) { - this.flush(); - shuffleMsgCache.flush(); + shuffleChannel.getProducer().flush(); for (ISplit split : shuffleChannel.getQueueList()) { IMessage message = batchFinishMessage.getMsg().deepCopy(); message.getMessageBody().put(ORIGIN_QUEUE_ID, message.getHeader().getQueueId()); @@ -161,11 +173,51 @@ public abstract class WindowCache extends } shuffleChannel.getProducer().flush(); } - // System.out.println("insert cost is "+insertCount.get()+" shuffle cost is "+shuffleCount.get()+" finish batch cost is "+(System.currentTimeMillis()-start)); } + /** + * 对接收的消息按照不同shuffle key进行分组 + * + * @param messages + * @return + */ + protected Map<Integer, JSONArray> translateToShuffleMap(List<IMessage> messages) { + Map<Integer, JSONArray> shuffleMap = new HashMap<>(); + for (IMessage msg : messages) { + if (msg.getHeader().isSystemMessage()) { + continue; + } + String shuffleKey = generateShuffleKey(msg); + if (StringUtil.isEmpty(shuffleKey)) { + shuffleKey = "<null>"; + LOG.debug("there is no group by value in message! " + msg.getMessageBody().toString()); + //continue; + } + Integer index = shuffleChannel.hash(shuffleKey); + JSONObject body = msg.getMessageBody(); + String offset = msg.getHeader().getOffset(); + String queueId = msg.getHeader().getQueueId(); + + body.put(ORIGIN_OFFSET, offset); + body.put(ORIGIN_QUEUE_ID, queueId); + body.put(ORIGIN_QUEUE_IS_LONG, msg.getHeader().getMessageOffset().isLongOfMainOffset()); + body.put(ORIGIN_MESSAGE_HEADER, JSONObject.toJSONString(msg.getHeader())); + body.put(ORIGIN_MESSAGE_TRACE_ID, msg.getHeader().getTraceId()); + body.put(SHUFFLE_KEY, shuffleKey); + + addPropertyToMessage(msg, body); + + JSONArray jsonArray = shuffleMap.get(index); + if (jsonArray == null) { + jsonArray = new JSONArray(); + shuffleMap.put(index, jsonArray); + } + jsonArray.add(body); + } + return shuffleMap; + } /** * 根据message生成shuffle key @@ -195,8 +247,8 @@ public abstract class WindowCache extends return shuffleChannel; } - public MiniBatchMsgCache getShuffleMsgCache() { - return this.shuffleMsgCache; + public ShuffleMsgCache getShuffleMsgCache() { + return shuffleMsgCache; } public void setShuffleChannel(ShuffleChannel shuffleChannel) { diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java index 7c78f478..ea3b923a 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/operator/AbstractShuffleWindow.java @@ -86,7 +86,6 @@ public abstract class AbstractShuffleWindow extends AbstractWindow { this.shuffleChannel.init(); windowCache.setBatchSize(5000); windowCache.setShuffleChannel(shuffleChannel); - windowCache.initMiniBatch(); shuffleChannel.startChannel(); hasCreated.set(true); } diff --git a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java index d834af41..107f1ce5 100644 --- a/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java +++ b/rocketmq-streams-window/src/main/java/org/apache/rocketmq/streams/window/shuffle/ShuffleChannel.java @@ -47,7 +47,6 @@ import org.apache.rocketmq.streams.common.utils.MapKeyUtil; import org.apache.rocketmq.streams.common.utils.StringUtil; import org.apache.rocketmq.streams.common.utils.TraceUtil; import org.apache.rocketmq.streams.window.debug.DebugWriter; -import org.apache.rocketmq.streams.window.minibatch.MiniBatchMsgCache; import org.apache.rocketmq.streams.window.model.WindowCache; import org.apache.rocketmq.streams.window.model.WindowInstance; import org.apache.rocketmq.streams.window.operator.AbstractShuffleWindow; @@ -61,7 +60,6 @@ import java.util.Properties; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Future; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import static org.apache.rocketmq.streams.window.model.WindowCache.ORIGIN_MESSAGE_TRACE_ID;
