This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b7480e1a89 [Fix][connector-rocketmq] commit a correct offset to broker
& reduce ThreadInterruptedException log (#6668)
b7480e1a89 is described below
commit b7480e1a890040479fb7c39385d811df1508f61a
Author: YalikWang <[email protected]>
AuthorDate: Tue Apr 16 10:03:13 2024 +0800
[Fix][connector-rocketmq] commit a correct offset to broker & reduce
ThreadInterruptedException log (#6668)
---
.../rocketmq/source/RocketMqConsumerThread.java | 30 +++++++
.../rocketmq/source/RocketMqSourceReader.java | 94 ++++++++++------------
.../e2e/connector/rocketmq/RocketMqIT.java | 45 +++++++++++
.../rocketmq-source_tex_with_offset_check.conf | 72 +++++++++++++++++
4 files changed, 189 insertions(+), 52 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java
index bfd34c3036..fcdef8bb94 100644
---
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java
@@ -23,7 +23,10 @@ import
org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConn
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageQueue;
+import java.util.Collections;
+import java.util.Objects;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
@@ -33,6 +36,11 @@ public class RocketMqConsumerThread implements Runnable {
private final ConsumerMetadata metadata;
private final LinkedBlockingQueue<Consumer<DefaultLitePullConsumer>> tasks;
+ private MessageQueue assignedMessageQueue;
+
+ /** It is different from the committed offset,just means the last offset
that has been polled */
+ private long lastPolledOffset = -2;
+
public RocketMqConsumerThread(ConsumerMetadata metadata) {
this.metadata = metadata;
this.tasks = new LinkedBlockingQueue<>();
@@ -70,4 +78,26 @@ public class RocketMqConsumerThread implements Runnable {
public LinkedBlockingQueue<Consumer<DefaultLitePullConsumer>> getTasks() {
return tasks;
}
+
+ public void assign(RocketMqSourceSplit sourceSplit) throws
MQClientException {
+ boolean messageQueueChanged =
+ assignedMessageQueue == null
+ || !Objects.equals(assignedMessageQueue,
sourceSplit.getMessageQueue());
+ if (messageQueueChanged) {
+ this.assignedMessageQueue = sourceSplit.getMessageQueue();
+ consumer.assign(Collections.singleton(assignedMessageQueue));
+ }
+ if (messageQueueChanged || lastPolledOffset !=
sourceSplit.getStartOffset() - 1) {
+ if (sourceSplit.getStartOffset() >= 0) {
+ Long committedOffset =
consumer.committed(assignedMessageQueue);
+ if (!Objects.equals(committedOffset,
sourceSplit.getStartOffset())) {
+ consumer.seek(assignedMessageQueue,
sourceSplit.getStartOffset());
+ }
+ }
+ }
+ }
+
+ public void markLastPolledOffset(long offset) {
+ this.lastPolledOffset = offset;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
index 2beef96f1b..90bc8f3231 100644
---
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
+++
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
@@ -18,7 +18,6 @@
package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
import org.apache.seatunnel.shade.com.google.common.collect.Maps;
-import org.apache.seatunnel.shade.com.google.common.collect.Sets;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Boundedness;
@@ -37,6 +36,7 @@ import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -50,7 +50,7 @@ public class RocketMqSourceReader implements
SourceReader<SeaTunnelRow, RocketMq
private static final long THREAD_WAIT_TIME = 500L;
- private final SourceReader.Context context;
+ private final Context context;
private final ConsumerMetadata metadata;
private final Set<RocketMqSourceSplit> sourceSplits;
private final Map<Long, Map<MessageQueue, Long>> checkpointOffsets;
@@ -65,7 +65,7 @@ public class RocketMqSourceReader implements
SourceReader<SeaTunnelRow, RocketMq
public RocketMqSourceReader(
ConsumerMetadata metadata,
DeserializationSchema<SeaTunnelRow> deserializationSchema,
- SourceReader.Context context) {
+ Context context) {
this.metadata = metadata;
this.context = context;
this.sourceSplits = new HashSet<>();
@@ -115,21 +115,16 @@ public class RocketMqSourceReader implements
SourceReader<SeaTunnelRow, RocketMq
sourceSplit -> {
CompletableFuture<Void> completableFuture = new
CompletableFuture<>();
try {
- consumerThreads
- .get(sourceSplit.getMessageQueue())
+ RocketMqConsumerThread rocketMqConsumerThread =
+
consumerThreads.get(sourceSplit.getMessageQueue());
+ rocketMqConsumerThread
.getTasks()
.put(
consumer -> {
try {
- Set<MessageQueue>
messageQueues =
- Sets.newHashSet(
-
sourceSplit.getMessageQueue());
- consumer.assign(messageQueues);
- if
(sourceSplit.getStartOffset() >= 0) {
- consumer.seek(
-
sourceSplit.getMessageQueue(),
-
sourceSplit.getStartOffset());
- }
+
rocketMqConsumerThread.assign(sourceSplit);
+ MessageQueue
assignedMessageQueue =
+
sourceSplit.getMessageQueue();
List<MessageExt> records =
consumer.poll(
metadata.getBaseConfig()
@@ -141,47 +136,36 @@ public class RocketMqSourceReader implements
SourceReader<SeaTunnelRow, RocketMq
sourceSplit.getStartOffset(),
sourceSplit.getEndOffset());
}
- Map<MessageQueue,
List<MessageExt>> groupRecords =
+ List<MessageExt> messages =
records.stream()
- .collect(
-
Collectors.groupingBy(
-
record ->
-
new MessageQueue(
-
record
-
.getTopic(),
-
record
-
.getBrokerName(),
-
record
-
.getQueueId())));
- for (MessageQueue messageQueue
: messageQueues) {
- if
(!groupRecords.containsKey(messageQueue)) {
- continue;
- }
- List<MessageExt> messages =
-
groupRecords.get(messageQueue);
- for (MessageExt record :
messages) {
-
deserializationSchema.deserialize(
-
record.getBody(), output);
- if
(Boundedness.BOUNDED.equals(
-
context.getBoundedness())
- &&
record.getQueueOffset()
- >=
sourceSplit
-
.getEndOffset()) {
- break;
- }
- }
- long lastOffset = -1;
- if (!messages.isEmpty()) {
- lastOffset =
-
messages.get(messages.size() - 1)
-
.getQueueOffset();
-
sourceSplit.setStartOffset(lastOffset);
- }
-
- if (lastOffset >=
sourceSplit.getEndOffset()) {
-
sourceSplit.setEndOffset(lastOffset);
+ .filter(
+ record
->
+
isQueueMatch(
+
assignedMessageQueue,
+
record))
+
.collect(Collectors.toList());
+ long lastOffset = -1;
+ for (MessageExt record :
messages) {
+
deserializationSchema.deserialize(
+ record.getBody(),
output);
+ lastOffset =
record.getQueueOffset();
+ if
(Boundedness.BOUNDED.equals(
+
context.getBoundedness())
+ &&
record.getQueueOffset()
+ >=
sourceSplit.getEndOffset()) {
+ break;
}
}
+ if (lastOffset >= 0) {
+ // set start offset for
next poll cycleLife
+
sourceSplit.setStartOffset(lastOffset + 1);
+
rocketMqConsumerThread.markLastPolledOffset(
+ lastOffset);
+ }
+ if (lastOffset >=
sourceSplit.getEndOffset()) {
+ // just for bounded mode
+
sourceSplit.setEndOffset(lastOffset);
+ }
} catch (Exception e) {
completableFuture.completeExceptionally(e);
}
@@ -200,6 +184,12 @@ public class RocketMqSourceReader implements
SourceReader<SeaTunnelRow, RocketMq
}
}
+ private boolean isQueueMatch(MessageQueue assignedMessageQueue, MessageExt
record) {
+ return Objects.equals(assignedMessageQueue.getTopic(),
record.getTopic())
+ && Objects.equals(assignedMessageQueue.getBrokerName(),
record.getBrokerName())
+ && Objects.equals(assignedMessageQueue.getQueueId(),
record.getQueueId());
+ }
+
@Override
public List<RocketMqSourceSplit> snapshotState(long checkpointId) throws
Exception {
List<RocketMqSourceSplit> pendingSplit =
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
index aba0a9f2c0..01d7c77683 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
@@ -37,7 +37,9 @@ import
org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConn
import
org.apache.seatunnel.connectors.seatunnel.rocketmq.serialize.DefaultSeaTunnelRowSerializer;
import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.engine.common.Constant;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
@@ -68,10 +70,12 @@ import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.time.LocalDate;
import java.time.LocalDateTime;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import static
org.apache.seatunnel.e2e.connector.rocketmq.RocketMqContainer.NAMESRV_PORT;
@@ -214,6 +218,27 @@ public class RocketMqIT extends TestSuiteBase implements
TestResource {
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
+ @TestTemplate
+ @DisabledOnContainer(
+ value = {},
+ type = {EngineType.SPARK, EngineType.FLINK},
+ disabledReason = "flink and spark won't commit offset when batch
job finished")
+ public void testSourceRocketMqTextToConsoleWithOffsetCheck(TestContainer
container)
+ throws IOException, InterruptedException {
+ DefaultSeaTunnelRowSerializer serializer =
+ new DefaultSeaTunnelRowSerializer(
+ "test_topic_text_offset_check",
+ SEATUNNEL_ROW_TYPE,
+ SchemaFormat.TEXT,
+ DEFAULT_FIELD_DELIMITER);
+ generateTestData(
+ row -> serializer.serializeRow(row),
"test_topic_text_offset_check", 0, 10);
+ Container.ExecResult execResult =
+
container.executeJob("/rocketmq-source_tex_with_offset_check.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ checkOffsetNoDiff("test_topic_text_offset_check",
"SeaTunnel-Consumer-Group");
+ }
+
@TestTemplate
public void testSourceRocketMqJsonToConsole(TestContainer container)
throws IOException, InterruptedException {
@@ -375,6 +400,26 @@ public class RocketMqIT extends TestSuiteBase implements
TestResource {
return data;
}
+ private void checkOffsetNoDiff(String topicName, String consumerGroup) {
+ RocketMqBaseConfiguration config = newConfiguration();
+ config.setGroupId(consumerGroup);
+ List<Map<MessageQueue, TopicOffset>> offsetTopics =
+ RocketMqAdminUtil.offsetTopics(config,
Arrays.asList(topicName));
+ Map<MessageQueue, TopicOffset> offsetMap = offsetTopics.get(0);
+ Set<MessageQueue> messageQueues = offsetMap.keySet();
+ Map<MessageQueue, Long> currentOffsets =
+ RocketMqAdminUtil.currentOffsets(config,
Arrays.asList(topicName), messageQueues);
+ for (Map.Entry<MessageQueue, TopicOffset> offsetEntry :
offsetMap.entrySet()) {
+ MessageQueue messageQueue = offsetEntry.getKey();
+ long maxOffset = offsetEntry.getValue().getMaxOffset();
+ Long consumeOffset = currentOffsets.get(messageQueue);
+ Assertions.assertEquals(
+ maxOffset,
+ consumeOffset,
+ "Offset different,maxOffset=" + maxOffset +
",consumeOffset=" + consumeOffset);
+ }
+ }
+
public RocketMqBaseConfiguration newConfiguration() {
return RocketMqBaseConfiguration.newBuilder()
.groupId(ROCKETMQ_GROUP)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_tex_with_offset_check.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_tex_with_offset_check.conf
new file mode 100644
index 0000000000..094660837a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_tex_with_offset_check.conf
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+ checkpoint.interval = 1000
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 1
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ Rocketmq {
+ name.srv.addr = "rocketmq-e2e:9876"
+ topics = "test_topic_text_offset_check"
+ result_table_name = "rocketmq_table"
+ consumer.group = "SeaTunnel-Consumer-Group"
+ schema = {
+ fields {
+ id = bigint
+ c_map = "map<string, smallint>"
+ c_array = "array<tinyint>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ c_int = int
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_decimal = "decimal(2, 1)"
+ c_bytes = bytes
+ c_date = date
+ c_timestamp = timestamp
+ }
+ }
+ format = text
+ # The default field delimiter is ","
+ field_delimiter = ","
+ }
+}
+
+transform {
+}
+
+sink {
+ Console {
+ source_table_name = "rocketmq_table"
+ }
+}
\ No newline at end of file