This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 999cfd606 [Improve][Connector-V2-kafka] Support for dynamic discover 
topic & partition in streaming mode (#3125)
999cfd606 is described below

commit 999cfd6069d128b26f19d3ed268a90695f7f567b
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Tue Nov 22 09:35:45 2022 +0800

    [Improve][Connector-V2-kafka] Support for dynamic discover topic & 
partition in streaming mode (#3125)
    
    [Improve][Connector-V2-kafka] Support for dynamic discover topic & 
partition in streaming mode
    
    Co-authored-by: zhouyao <[email protected]>
    Co-authored-by: Eric <[email protected]>
---
 docs/en/connector-v2/source/kafka.md               | 36 +++++++-----
 .../connectors/seatunnel/kafka/config/Config.java  |  8 +++
 .../seatunnel/kafka/source/KafkaSource.java        | 10 +++-
 .../seatunnel/kafka/source/KafkaSourceFactory.java | 11 ++--
 .../seatunnel/kafka/source/KafkaSourceReader.java  | 25 +++++---
 .../kafka/source/KafkaSourceSplitEnumerator.java   | 66 +++++++++++++++++++---
 6 files changed, 119 insertions(+), 37 deletions(-)

diff --git a/docs/en/connector-v2/source/kafka.md 
b/docs/en/connector-v2/source/kafka.md
index a0c660b26..80a1cba60 100644
--- a/docs/en/connector-v2/source/kafka.md
+++ b/docs/en/connector-v2/source/kafka.md
@@ -17,20 +17,21 @@ Source connector for Apache Kafka.
 
 ## Options
 
-| name                 | type    | required | default value            |
-|----------------------|---------| -------- |--------------------------|
-| topic                | String  | yes      | -                        |
-| bootstrap.servers    | String  | yes      | -                        |
-| pattern              | Boolean | no       | false                    |
-| consumer.group       | String  | no       | SeaTunnel-Consumer-Group |
-| commit_on_checkpoint | Boolean | no       | true                     |
-| kafka.*              | String  | no       | -                        |
-| common-options       | config  | no       | -                        |
-| schema               |         | no       | -                        |
-| format               | String  | no       | json                     |
-| start_mode           | String  | no       | group_offsets            |
-| start_mode.offsets   |         | no       |                          |
-| start_mode.timestamp | Long    | no       |                          |
+| name                                | type    | required | default value     
       |
+|-------------------------------------|---------| -------- 
|--------------------------|
+| topic                               | String  | yes      | -                 
       |
+| bootstrap.servers                   | String  | yes      | -                 
       |
+| pattern                             | Boolean | no       | false             
       |
+| consumer.group                      | String  | no       | 
SeaTunnel-Consumer-Group |
+| commit_on_checkpoint                | Boolean | no       | true              
       |
+| kafka.*                             | String  | no       | -                 
       |
+| common-options                      | config  | no       | -                 
       |
+| schema                              |         | no       | -                 
       |
+| format                              | String  | no       | json              
       |
+| start_mode                          | String  | no       | group_offsets     
       |
+| start_mode.offsets                  |         | no       |                   
       |
+| start_mode.timestamp                | Long    | no       |                   
       |
+| partition-discovery.interval-millis | long    | no       | -1                
       |
 
 ### topic [string]
 
@@ -52,6 +53,10 @@ If `pattern` is set to `true`,the regular expression for a 
pattern of topic name
 
 If true the consumer's offset will be periodically committed in the background.
 
+## partition-discovery.interval-millis [long]
+
+The interval for dynamically discovering topics and partitions.
+
 ### kafka.* [string]
 
 In addition to the above necessary parameters that must be specified by the 
`Kafka consumer` client, users can also specify multiple `consumer` client 
non-mandatory parameters, covering [all consumer parameters specified in the 
official Kafka 
document](https://kafka.apache.org/documentation.html#consumerconfigs).
@@ -147,4 +152,5 @@ source {
 
 ### Next Version
 
-- [Improve] Support setting read starting offset or time at startup config 
([3157](https://github.com/apache/incubator-seatunnel/pull/3157))
\ No newline at end of file
+- [Improve] Support setting read starting offset or time at startup config 
([3157](https://github.com/apache/incubator-seatunnel/pull/3157))
+- [Improve] Support for dynamic discover topic & partition in streaming mode 
([3125](https://github.com/apache/incubator-seatunnel/pull/3125))
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
index 6030bfa39..d4f982bca 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/config/Config.java
@@ -122,4 +122,12 @@ public class Config {
             .noDefaultValue()
             .withDescription("The offset required for consumption mode to be 
specific_offsets.");
 
+    /**
+     * Configuration key to define the consumer's partition discovery 
interval, in milliseconds.
+     */
+    public static final Option<Long> KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS = 
Options.key("partition-discovery.interval-millis")
+        .longType()
+        .defaultValue(-1L)
+        .withDescription("The interval for dynamically discovering topics and 
partitions.");
+
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
index 79d1328b4..01ab14cc6 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSource.java
@@ -24,6 +24,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFA
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.DEFAULT_FORMAT;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FIELD_DELIMITER;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.FORMAT;
+import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.PATTERN;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.SCHEMA;
 import static 
org.apache.seatunnel.connectors.seatunnel.kafka.config.Config.START_MODE;
@@ -72,6 +73,7 @@ public class KafkaSource implements 
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
     private DeserializationSchema<SeaTunnelRow> deserializationSchema;
     private SeaTunnelRowType typeInfo;
     private JobContext jobContext;
+    private long discoveryIntervalMillis = 
KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.defaultValue();
 
     @Override
     public Boundedness getBoundedness() {
@@ -140,6 +142,10 @@ public class KafkaSource implements 
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
             }
         }
 
+        if (config.hasPath(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key())) {
+            this.discoveryIntervalMillis = 
config.getLong(KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS.key());
+        }
+
         TypesafeConfigUtils.extractSubConfig(config, "kafka.", 
false).entrySet().forEach(e -> {
             this.metadata.getProperties().put(e.getKey(), 
String.valueOf(e.getValue().unwrapped()));
         });
@@ -159,12 +165,12 @@ public class KafkaSource implements 
SeaTunnelSource<SeaTunnelRow, KafkaSourceSpl
 
     @Override
     public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> 
createEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit> 
enumeratorContext) throws Exception {
-        return new KafkaSourceSplitEnumerator(this.metadata, 
enumeratorContext);
+        return new KafkaSourceSplitEnumerator(this.metadata, 
enumeratorContext, discoveryIntervalMillis);
     }
 
     @Override
     public SourceSplitEnumerator<KafkaSourceSplit, KafkaSourceState> 
restoreEnumerator(SourceSplitEnumerator.Context<KafkaSourceSplit> 
enumeratorContext, KafkaSourceState checkpointState) throws Exception {
-        return new KafkaSourceSplitEnumerator(this.metadata, 
enumeratorContext, checkpointState);
+        return new KafkaSourceSplitEnumerator(this.metadata, 
enumeratorContext, checkpointState, discoveryIntervalMillis);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
index 222a48bde..747a3542f 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceFactory.java
@@ -37,10 +37,11 @@ public class KafkaSourceFactory implements 
TableSourceFactory {
     @Override
     public OptionRule optionRule() {
         return OptionRule.builder()
-                .required(Config.TOPIC, Config.BOOTSTRAP_SERVERS)
-                .optional(Config.PATTERN, Config.CONSUMER_GROUP, 
Config.COMMIT_ON_CHECKPOINT, Config.KAFKA_CONFIG_PREFIX, Config.SCHEMA, 
Config.FORMAT)
-                .conditional(Condition.of(Config.START_MODE, 
StartMode.TIMESTAMP), Config.START_MODE_TIMESTAMP)
-                .conditional(Condition.of(Config.START_MODE, 
StartMode.SPECIFIC_OFFSETS), Config.START_MODE_OFFSETS)
-                .build();
+            .required(Config.TOPIC, Config.BOOTSTRAP_SERVERS)
+            .optional(Config.PATTERN, Config.CONSUMER_GROUP, 
Config.COMMIT_ON_CHECKPOINT, Config.KAFKA_CONFIG_PREFIX, Config.SCHEMA,
+                Config.FORMAT, Config.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS)
+            .conditional(Condition.of(Config.START_MODE, StartMode.TIMESTAMP), 
Config.START_MODE_TIMESTAMP)
+            .conditional(Condition.of(Config.START_MODE, 
StartMode.SPECIFIC_OFFSETS), Config.START_MODE_OFFSETS)
+            .build();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
index 24252ce68..7eb68237f 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceReader.java
@@ -41,9 +41,9 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
 import java.util.stream.Collectors;
 
 @Slf4j
@@ -56,11 +56,14 @@ public class KafkaSourceReader implements 
SourceReader<SeaTunnelRow, KafkaSource
     private final ConsumerMetadata metadata;
     private final Set<KafkaSourceSplit> sourceSplits;
     private final Map<Long, Map<TopicPartition, Long>> checkpointOffsetMap;
-    private final ConcurrentMap<TopicPartition, KafkaSourceSplit> 
sourceSplitMap;
     private final Map<TopicPartition, KafkaConsumerThread> consumerThreadMap;
     private final ExecutorService executorService;
     private final DeserializationSchema<SeaTunnelRow> deserializationSchema;
 
+    private final LinkedBlockingQueue<KafkaSourceSplit> pendingPartitionsQueue;
+
+    private volatile boolean running = false;
+
     KafkaSourceReader(ConsumerMetadata metadata,
                       DeserializationSchema<SeaTunnelRow> 
deserializationSchema,
                       SourceReader.Context context) {
@@ -69,10 +72,10 @@ public class KafkaSourceReader implements 
SourceReader<SeaTunnelRow, KafkaSource
         this.sourceSplits = new HashSet<>();
         this.deserializationSchema = deserializationSchema;
         this.consumerThreadMap = new ConcurrentHashMap<>();
-        this.sourceSplitMap = new ConcurrentHashMap<>();
         this.checkpointOffsetMap = new ConcurrentHashMap<>();
         this.executorService = Executors.newCachedThreadPool(
             r -> new Thread(r, "Kafka Source Data Consumer"));
+        pendingPartitionsQueue = new LinkedBlockingQueue<>();
     }
 
     @Override
@@ -88,10 +91,14 @@ public class KafkaSourceReader implements 
SourceReader<SeaTunnelRow, KafkaSource
 
     @Override
     public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
-        if (sourceSplitMap.isEmpty()) {
+        if (!running) {
             Thread.sleep(THREAD_WAIT_TIME);
             return;
         }
+
+        while (pendingPartitionsQueue.size() != 0) {
+            sourceSplits.add(pendingPartitionsQueue.poll());
+        }
         sourceSplits.forEach(sourceSplit -> 
consumerThreadMap.computeIfAbsent(sourceSplit.getTopicPartition(), s -> {
             KafkaConsumerThread thread = new KafkaConsumerThread(metadata);
             executorService.submit(thread);
@@ -157,9 +164,13 @@ public class KafkaSourceReader implements 
SourceReader<SeaTunnelRow, KafkaSource
 
     @Override
     public void addSplits(List<KafkaSourceSplit> splits) {
-        sourceSplits.addAll(splits);
-        sourceSplits.forEach(split -> {
-            sourceSplitMap.put(split.getTopicPartition(), split);
+        running = true;
+        splits.forEach(s -> {
+            try {
+                pendingPartitionsQueue.put(s);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
         });
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
index 59c1aba7f..71760b96b 100644
--- 
a/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
+++ 
b/seatunnel-connectors-v2/connector-kafka/src/main/java/org/apache/seatunnel/connectors/seatunnel/kafka/source/KafkaSourceSplitEnumerator.java
@@ -38,6 +38,10 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -48,10 +52,13 @@ public class KafkaSourceSplitEnumerator implements 
SourceSplitEnumerator<KafkaSo
 
     private final ConsumerMetadata metadata;
     private final Context<KafkaSourceSplit> context;
+    private long discoveryIntervalMillis;
     private AdminClient adminClient;
 
     private Map<TopicPartition, KafkaSourceSplit> pendingSplit;
     private final Map<TopicPartition, KafkaSourceSplit> assignedSplit;
+    private ScheduledExecutorService executor;
+    private ScheduledFuture scheduledFuture;
 
     KafkaSourceSplitEnumerator(ConsumerMetadata metadata, 
Context<KafkaSourceSplit> context) {
         this.metadata = metadata;
@@ -65,20 +72,43 @@ public class KafkaSourceSplitEnumerator implements 
SourceSplitEnumerator<KafkaSo
         this(metadata, context);
     }
 
+    KafkaSourceSplitEnumerator(ConsumerMetadata metadata, 
Context<KafkaSourceSplit> context,
+                               long discoveryIntervalMillis) {
+        this(metadata, context);
+        this.discoveryIntervalMillis = discoveryIntervalMillis;
+    }
+
+    KafkaSourceSplitEnumerator(ConsumerMetadata metadata, 
Context<KafkaSourceSplit> context,
+                               KafkaSourceState sourceState, long 
discoveryIntervalMillis) {
+        this(metadata, context, sourceState);
+        this.discoveryIntervalMillis = discoveryIntervalMillis;
+    }
+
     @Override
     public void open() {
         this.adminClient = initAdminClient(this.metadata.getProperties());
+        if (discoveryIntervalMillis > 0) {
+            this.executor = Executors.newScheduledThreadPool(1, runnable -> {
+                Thread thread = new Thread(runnable);
+                thread.setDaemon(true);
+                thread.setName("kafka-partition-dynamic-discovery");
+                return thread;
+            });
+            this.scheduledFuture = executor.scheduleWithFixedDelay(
+                () -> {
+                    try {
+                        discoverySplits();
+                    } catch (Exception e) {
+                        log.error("Dynamic discovery failure:", e);
+                    }
+                }, discoveryIntervalMillis, discoveryIntervalMillis, 
TimeUnit.MILLISECONDS
+            );
+        }
     }
 
     @Override
     public void run() throws ExecutionException, InterruptedException {
-        getTopicInfo().forEach(split -> {
-            if (!assignedSplit.containsKey(split.getTopicPartition())) {
-                if (!pendingSplit.containsKey(split.getTopicPartition())) {
-                    pendingSplit.put(split.getTopicPartition(), split);
-                }
-            }
-        });
+        fetchPendingPartitionSplit();
         setPartitionStartOffset();
         assignSplit();
     }
@@ -117,6 +147,12 @@ public class KafkaSourceSplitEnumerator implements 
SourceSplitEnumerator<KafkaSo
         if (this.adminClient != null) {
             adminClient.close();
         }
+        if (scheduledFuture != null) {
+            scheduledFuture.cancel(false);
+            if (executor != null) {
+                executor.shutdownNow();
+            }
+        }
     }
 
     @Override
@@ -197,7 +233,7 @@ public class KafkaSourceSplitEnumerator implements 
SourceSplitEnumerator<KafkaSo
         }).collect(Collectors.toSet());
     }
 
-    private void assignSplit() {
+    private synchronized void assignSplit() {
         Map<Integer, List<KafkaSourceSplit>> readySplit = new 
HashMap<>(Common.COLLECTION_SIZE);
         for (int taskID = 0; taskID < context.currentParallelism(); taskID++) {
             readySplit.computeIfAbsent(taskID, id -> new ArrayList<>());
@@ -260,4 +296,18 @@ public class KafkaSourceSplitEnumerator implements 
SourceSplitEnumerator<KafkaSo
             .get();
     }
 
+    private void discoverySplits() throws ExecutionException, 
InterruptedException {
+        fetchPendingPartitionSplit();
+        assignSplit();
+    }
+
+    private void fetchPendingPartitionSplit() throws ExecutionException, 
InterruptedException {
+        getTopicInfo().forEach(split -> {
+            if (!assignedSplit.containsKey(split.getTopicPartition())) {
+                if (!pendingSplit.containsKey(split.getTopicPartition())) {
+                    pendingSplit.put(split.getTopicPartition(), split);
+                }
+            }
+        });
+    }
 }

Reply via email to