This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b7480e1a89 [Fix][connector-rocketmq] commit a correct offset to broker 
& reduce ThreadInterruptedException log (#6668)
b7480e1a89 is described below

commit b7480e1a890040479fb7c39385d811df1508f61a
Author: YalikWang <[email protected]>
AuthorDate: Tue Apr 16 10:03:13 2024 +0800

    [Fix][connector-rocketmq] commit a correct offset to broker & reduce 
ThreadInterruptedException log (#6668)
---
 .../rocketmq/source/RocketMqConsumerThread.java    | 30 +++++++
 .../rocketmq/source/RocketMqSourceReader.java      | 94 ++++++++++------------
 .../e2e/connector/rocketmq/RocketMqIT.java         | 45 +++++++++++
 .../rocketmq-source_tex_with_offset_check.conf     | 72 +++++++++++++++++
 4 files changed, 189 insertions(+), 52 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java
index bfd34c3036..fcdef8bb94 100644
--- 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqConsumerThread.java
@@ -23,7 +23,10 @@ import 
org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConn
 
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
 import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.message.MessageQueue;
 
+import java.util.Collections;
+import java.util.Objects;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 import java.util.function.Consumer;
@@ -33,6 +36,11 @@ public class RocketMqConsumerThread implements Runnable {
     private final ConsumerMetadata metadata;
     private final LinkedBlockingQueue<Consumer<DefaultLitePullConsumer>> tasks;
 
+    private MessageQueue assignedMessageQueue;
+
+    /** It is different from the committed offset,just means the last offset 
that has been polled */
+    private long lastPolledOffset = -2;
+
     public RocketMqConsumerThread(ConsumerMetadata metadata) {
         this.metadata = metadata;
         this.tasks = new LinkedBlockingQueue<>();
@@ -70,4 +78,26 @@ public class RocketMqConsumerThread implements Runnable {
     public LinkedBlockingQueue<Consumer<DefaultLitePullConsumer>> getTasks() {
         return tasks;
     }
+
+    public void assign(RocketMqSourceSplit sourceSplit) throws 
MQClientException {
+        boolean messageQueueChanged =
+                assignedMessageQueue == null
+                        || !Objects.equals(assignedMessageQueue, 
sourceSplit.getMessageQueue());
+        if (messageQueueChanged) {
+            this.assignedMessageQueue = sourceSplit.getMessageQueue();
+            consumer.assign(Collections.singleton(assignedMessageQueue));
+        }
+        if (messageQueueChanged || lastPolledOffset != 
sourceSplit.getStartOffset() - 1) {
+            if (sourceSplit.getStartOffset() >= 0) {
+                Long committedOffset = 
consumer.committed(assignedMessageQueue);
+                if (!Objects.equals(committedOffset, 
sourceSplit.getStartOffset())) {
+                    consumer.seek(assignedMessageQueue, 
sourceSplit.getStartOffset());
+                }
+            }
+        }
+    }
+
+    public void markLastPolledOffset(long offset) {
+        this.lastPolledOffset = offset;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
index 2beef96f1b..90bc8f3231 100644
--- 
a/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-rocketmq/src/main/java/org/apache/seatunnel/connectors/seatunnel/rocketmq/source/RocketMqSourceReader.java
@@ -18,7 +18,6 @@
 package org.apache.seatunnel.connectors.seatunnel.rocketmq.source;
 
 import org.apache.seatunnel.shade.com.google.common.collect.Maps;
-import org.apache.seatunnel.shade.com.google.common.collect.Sets;
 
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Boundedness;
@@ -37,6 +36,7 @@ import java.io.IOException;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -50,7 +50,7 @@ public class RocketMqSourceReader implements 
SourceReader<SeaTunnelRow, RocketMq
 
     private static final long THREAD_WAIT_TIME = 500L;
 
-    private final SourceReader.Context context;
+    private final Context context;
     private final ConsumerMetadata metadata;
     private final Set<RocketMqSourceSplit> sourceSplits;
     private final Map<Long, Map<MessageQueue, Long>> checkpointOffsets;
@@ -65,7 +65,7 @@ public class RocketMqSourceReader implements 
SourceReader<SeaTunnelRow, RocketMq
     public RocketMqSourceReader(
             ConsumerMetadata metadata,
             DeserializationSchema<SeaTunnelRow> deserializationSchema,
-            SourceReader.Context context) {
+            Context context) {
         this.metadata = metadata;
         this.context = context;
         this.sourceSplits = new HashSet<>();
@@ -115,21 +115,16 @@ public class RocketMqSourceReader implements 
SourceReader<SeaTunnelRow, RocketMq
                 sourceSplit -> {
                     CompletableFuture<Void> completableFuture = new 
CompletableFuture<>();
                     try {
-                        consumerThreads
-                                .get(sourceSplit.getMessageQueue())
+                        RocketMqConsumerThread rocketMqConsumerThread =
+                                
consumerThreads.get(sourceSplit.getMessageQueue());
+                        rocketMqConsumerThread
                                 .getTasks()
                                 .put(
                                         consumer -> {
                                             try {
-                                                Set<MessageQueue> 
messageQueues =
-                                                        Sets.newHashSet(
-                                                                
sourceSplit.getMessageQueue());
-                                                consumer.assign(messageQueues);
-                                                if 
(sourceSplit.getStartOffset() >= 0) {
-                                                    consumer.seek(
-                                                            
sourceSplit.getMessageQueue(),
-                                                            
sourceSplit.getStartOffset());
-                                                }
+                                                
rocketMqConsumerThread.assign(sourceSplit);
+                                                MessageQueue 
assignedMessageQueue =
+                                                        
sourceSplit.getMessageQueue();
                                                 List<MessageExt> records =
                                                         consumer.poll(
                                                                 
metadata.getBaseConfig()
@@ -141,47 +136,36 @@ public class RocketMqSourceReader implements 
SourceReader<SeaTunnelRow, RocketMq
                                                             
sourceSplit.getStartOffset(),
                                                             
sourceSplit.getEndOffset());
                                                 }
-                                                Map<MessageQueue, 
List<MessageExt>> groupRecords =
+                                                List<MessageExt> messages =
                                                         records.stream()
-                                                                .collect(
-                                                                        
Collectors.groupingBy(
-                                                                               
 record ->
-                                                                               
         new MessageQueue(
-                                                                               
                 record
-                                                                               
                         .getTopic(),
-                                                                               
                 record
-                                                                               
                         .getBrokerName(),
-                                                                               
                 record
-                                                                               
                         .getQueueId())));
-                                                for (MessageQueue messageQueue 
: messageQueues) {
-                                                    if 
(!groupRecords.containsKey(messageQueue)) {
-                                                        continue;
-                                                    }
-                                                    List<MessageExt> messages =
-                                                            
groupRecords.get(messageQueue);
-                                                    for (MessageExt record : 
messages) {
-                                                        
deserializationSchema.deserialize(
-                                                                
record.getBody(), output);
-                                                        if 
(Boundedness.BOUNDED.equals(
-                                                                        
context.getBoundedness())
-                                                                && 
record.getQueueOffset()
-                                                                        >= 
sourceSplit
-                                                                               
 .getEndOffset()) {
-                                                            break;
-                                                        }
-                                                    }
-                                                    long lastOffset = -1;
-                                                    if (!messages.isEmpty()) {
-                                                        lastOffset =
-                                                                
messages.get(messages.size() - 1)
-                                                                        
.getQueueOffset();
-                                                        
sourceSplit.setStartOffset(lastOffset);
-                                                    }
-
-                                                    if (lastOffset >= 
sourceSplit.getEndOffset()) {
-                                                        
sourceSplit.setEndOffset(lastOffset);
+                                                                .filter(
+                                                                        record 
->
+                                                                               
 isQueueMatch(
+                                                                               
         assignedMessageQueue,
+                                                                               
         record))
+                                                                
.collect(Collectors.toList());
+                                                long lastOffset = -1;
+                                                for (MessageExt record : 
messages) {
+                                                    
deserializationSchema.deserialize(
+                                                            record.getBody(), 
output);
+                                                    lastOffset = 
record.getQueueOffset();
+                                                    if 
(Boundedness.BOUNDED.equals(
+                                                                    
context.getBoundedness())
+                                                            && 
record.getQueueOffset()
+                                                                    >= 
sourceSplit.getEndOffset()) {
+                                                        break;
                                                     }
                                                 }
+                                                if (lastOffset >= 0) {
+                                                    // set start offset for 
next poll cycleLife
+                                                    
sourceSplit.setStartOffset(lastOffset + 1);
+                                                    
rocketMqConsumerThread.markLastPolledOffset(
+                                                            lastOffset);
+                                                }
+                                                if (lastOffset >= 
sourceSplit.getEndOffset()) {
+                                                    // just for bounded mode
+                                                    
sourceSplit.setEndOffset(lastOffset);
+                                                }
                                             } catch (Exception e) {
                                                 
completableFuture.completeExceptionally(e);
                                             }
@@ -200,6 +184,12 @@ public class RocketMqSourceReader implements 
SourceReader<SeaTunnelRow, RocketMq
         }
     }
 
+    private boolean isQueueMatch(MessageQueue assignedMessageQueue, MessageExt 
record) {
+        return Objects.equals(assignedMessageQueue.getTopic(), 
record.getTopic())
+                && Objects.equals(assignedMessageQueue.getBrokerName(), 
record.getBrokerName())
+                && Objects.equals(assignedMessageQueue.getQueueId(), 
record.getQueueId());
+    }
+
     @Override
     public List<RocketMqSourceSplit> snapshotState(long checkpointId) throws 
Exception {
         List<RocketMqSourceSplit> pendingSplit =
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
index aba0a9f2c0..01d7c77683 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/java/org/apache/seatunnel/e2e/connector/rocketmq/RocketMqIT.java
@@ -37,7 +37,9 @@ import 
org.apache.seatunnel.connectors.seatunnel.rocketmq.exception.RocketMqConn
 import 
org.apache.seatunnel.connectors.seatunnel.rocketmq.serialize.DefaultSeaTunnelRowSerializer;
 import org.apache.seatunnel.e2e.common.TestResource;
 import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.EngineType;
 import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
 import org.apache.seatunnel.engine.common.Constant;
 
 import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
@@ -68,10 +70,12 @@ import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.UUID;
 
 import static 
org.apache.seatunnel.e2e.connector.rocketmq.RocketMqContainer.NAMESRV_PORT;
@@ -214,6 +218,27 @@ public class RocketMqIT extends TestSuiteBase implements 
TestResource {
         Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
     }
 
+    @TestTemplate
+    @DisabledOnContainer(
+            value = {},
+            type = {EngineType.SPARK, EngineType.FLINK},
+            disabledReason = "flink and spark won't commit offset when batch 
job finished")
+    public void testSourceRocketMqTextToConsoleWithOffsetCheck(TestContainer 
container)
+            throws IOException, InterruptedException {
+        DefaultSeaTunnelRowSerializer serializer =
+                new DefaultSeaTunnelRowSerializer(
+                        "test_topic_text_offset_check",
+                        SEATUNNEL_ROW_TYPE,
+                        SchemaFormat.TEXT,
+                        DEFAULT_FIELD_DELIMITER);
+        generateTestData(
+                row -> serializer.serializeRow(row), 
"test_topic_text_offset_check", 0, 10);
+        Container.ExecResult execResult =
+                
container.executeJob("/rocketmq-source_tex_with_offset_check.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+        checkOffsetNoDiff("test_topic_text_offset_check", 
"SeaTunnel-Consumer-Group");
+    }
+
     @TestTemplate
     public void testSourceRocketMqJsonToConsole(TestContainer container)
             throws IOException, InterruptedException {
@@ -375,6 +400,26 @@ public class RocketMqIT extends TestSuiteBase implements 
TestResource {
         return data;
     }
 
+    private void checkOffsetNoDiff(String topicName, String consumerGroup) {
+        RocketMqBaseConfiguration config = newConfiguration();
+        config.setGroupId(consumerGroup);
+        List<Map<MessageQueue, TopicOffset>> offsetTopics =
+                RocketMqAdminUtil.offsetTopics(config, 
Arrays.asList(topicName));
+        Map<MessageQueue, TopicOffset> offsetMap = offsetTopics.get(0);
+        Set<MessageQueue> messageQueues = offsetMap.keySet();
+        Map<MessageQueue, Long> currentOffsets =
+                RocketMqAdminUtil.currentOffsets(config, 
Arrays.asList(topicName), messageQueues);
+        for (Map.Entry<MessageQueue, TopicOffset> offsetEntry : 
offsetMap.entrySet()) {
+            MessageQueue messageQueue = offsetEntry.getKey();
+            long maxOffset = offsetEntry.getValue().getMaxOffset();
+            Long consumeOffset = currentOffsets.get(messageQueue);
+            Assertions.assertEquals(
+                    maxOffset,
+                    consumeOffset,
+                    "Offset different,maxOffset=" + maxOffset + 
",consumeOffset=" + consumeOffset);
+        }
+    }
+
     public RocketMqBaseConfiguration newConfiguration() {
         return RocketMqBaseConfiguration.newBuilder()
                 .groupId(ROCKETMQ_GROUP)
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_tex_with_offset_check.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_tex_with_offset_check.conf
new file mode 100644
index 0000000000..094660837a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-rocketmq-e2e/src/test/resources/rocketmq-source_tex_with_offset_check.conf
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+  checkpoint.interval = 1000
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  Rocketmq {
+    name.srv.addr = "rocketmq-e2e:9876"
+    topics = "test_topic_text_offset_check"
+    result_table_name = "rocketmq_table"
+    consumer.group = "SeaTunnel-Consumer-Group"
+    schema = {
+      fields {
+        id = bigint
+        c_map = "map<string, smallint>"
+        c_array = "array<tinyint>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(2, 1)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+    format = text
+    # The default field delimiter is ","
+    field_delimiter = ","
+  }
+}
+
+transform {
+}
+
+sink {
+  Console {
+    source_table_name = "rocketmq_table"
+  }
+}
\ No newline at end of file

Reply via email to