This is an automated email from the ASF dual-hosted git repository.
bli pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
The following commit(s) were added to refs/heads/main by this push:
new 5b69b2b2 [FLINK-38876] Support per-cluster offset in Dynamic Kafka
Source (#209)
5b69b2b2 is described below
commit 5b69b2b2af9bac5b9e6f8faa5b8aaa87b2f2ad1b
Author: bowenli86 <[email protected]>
AuthorDate: Thu Jan 15 16:22:09 2026 -0800
[FLINK-38876] Support per-cluster offset in Dynamic Kafka Source (#209)
- add support for per-cluster starting and stopping offset in
DynamicKafkaSource
- upgrade source state serializer to v2 and be backward compatible with v1
(which does not have per cluster offset)
- add corresponding tests
- update docs, add example code snippet
---
.../docs/connectors/datastream/dynamic-kafka.md | 48 ++++
.../docs/connectors/datastream/dynamic-kafka.md | 52 ++++
.../kafka/dynamic/metadata/ClusterMetadata.java | 66 ++++-
.../SingleClusterTopicMetadataService.java | 29 +-
.../dynamic/source/DynamicKafkaSourceBuilder.java | 5 +-
.../DynamicKafkaSourceEnumStateSerializer.java | 144 ++++++++--
.../enumerator/DynamicKafkaSourceEnumerator.java | 58 +++-
.../source/reader/DynamicKafkaSourceReader.java | 19 +-
.../dynamic/source/DynamicKafkaSourceITTest.java | 302 +++++++++++++++++++++
.../DynamicKafkaSourceEnumStateSerializerTest.java | 100 ++++++-
.../SingleClusterTopicMetadataServiceTest.java | 84 ++++++
.../DynamicKafkaSourceEnumStateTestUtils.java | 53 ++++
12 files changed, 911 insertions(+), 49 deletions(-)
diff --git a/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
b/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
index abe7a753..3c9ba7cd 100644
--- a/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
+++ b/docs/content.zh/docs/connectors/datastream/dynamic-kafka.md
@@ -77,6 +77,52 @@ The Kafka metadata service, configured by
setKafkaMetadataService(KafkaMetadataS
The stream ids to subscribe, see the following Kafka stream subscription
section for more details.
Deserializer to parse Kafka messages, see the [Kafka Source Documentation]({{<
ref "docs/connectors/datastream/kafka" >}}#deserializer) for more details.
+### Offsets 初始化
+
+可以通过 builder 配置全局的起始/停止 offsets。起始 offsets 同时适用于有界与无界模式;停止 offsets
仅在有界模式下生效。Cluster metadata 可以可选地携带每个集群的起始/停止 offsets initializer;如果存在,将覆盖全局默认配置。
+
+示例:在元数据中为不同集群设置偏移量初始化规则。
+
+{{< tabs "DynamicKafkaSourceOffsets" >}}
+{{< tab "Java" >}}
+```java
+Properties cluster0Props = new Properties();
+cluster0Props.setProperty(
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "cluster0:9092");
+Properties cluster1Props = new Properties();
+cluster1Props.setProperty(
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "cluster1:9092");
+
+KafkaStream stream =
+ new KafkaStream(
+ "input-stream",
+ Map.of(
+ "cluster0",
+ new ClusterMetadata(
+ Set.of("topic-a"),
+ cluster0Props,
+ OffsetsInitializer.earliest(),
+ OffsetsInitializer.latest()),
+ "cluster1",
+ new ClusterMetadata(
+ Set.of("topic-b"),
+ cluster1Props,
+ OffsetsInitializer.latest(),
+ null)));
+
+DynamicKafkaSource<String> source =
+ DynamicKafkaSource.<String>builder()
+ .setStreamIds(Set.of(stream.getStreamId()))
+ .setKafkaMetadataService(new MockKafkaMetadataService(Set.of(stream)))
+
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+ // 如果元数据中包含每个集群的起始 offsets,将覆盖此处设置。
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setBounded(OffsetsInitializer.latest())
+ .build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
### Kafka Stream Subscription
The Dynamic Kafka Source provides 2 ways of subscribing to Kafka stream(s).
* A set of Kafka stream ids. For example:
@@ -108,6 +154,8 @@ for any changes to the Kafka stream(s) and reconciling the
reader tasks to subsc
Kafka metadata returned by the service. For example, in the case of a Kafka
migration, the source would
swap from one cluster to the new cluster when the service makes that change in
the Kafka stream metadata.
+Cluster metadata 可以包含每个集群的起始/停止 offsets initializer,用于覆盖全局 builder 配置。
+
### Additional Properties
There are configuration options in DynamicKafkaSourceOptions that can be
configured in the properties through the builder:
<table class="table table-bordered">
diff --git a/docs/content/docs/connectors/datastream/dynamic-kafka.md
b/docs/content/docs/connectors/datastream/dynamic-kafka.md
index e64b93e6..cad80cdd 100644
--- a/docs/content/docs/connectors/datastream/dynamic-kafka.md
+++ b/docs/content/docs/connectors/datastream/dynamic-kafka.md
@@ -77,6 +77,55 @@ The Kafka metadata service, configured by
setKafkaMetadataService(KafkaMetadataS
The stream ids to subscribe, see the following Kafka stream subscription
section for more details.
Deserializer to parse Kafka messages, see the [Kafka Source Documentation]({{<
ref "docs/connectors/datastream/kafka" >}}#deserializer) for more details.
+### Offsets Initialization
+
+You can configure starting and stopping offsets globally via the builder.
Starting offsets apply to
+both bounded and unbounded sources, while stopping offsets only take effect
when the source runs in
+bounded mode. Cluster metadata may optionally include per-cluster starting or
stopping offsets
+initializers; if present, they override the global defaults for that cluster.
+
+Example: override offsets for specific clusters via metadata.
+
+{{< tabs "DynamicKafkaSourceOffsets" >}}
+{{< tab "Java" >}}
+```java
+Properties cluster0Props = new Properties();
+cluster0Props.setProperty(
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "cluster0:9092");
+Properties cluster1Props = new Properties();
+cluster1Props.setProperty(
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "cluster1:9092");
+
+KafkaStream stream =
+ new KafkaStream(
+ "input-stream",
+ Map.of(
+ "cluster0",
+ new ClusterMetadata(
+ Set.of("topic-a"),
+ cluster0Props,
+ OffsetsInitializer.earliest(),
+ OffsetsInitializer.latest()),
+ "cluster1",
+ new ClusterMetadata(
+ Set.of("topic-b"),
+ cluster1Props,
+ OffsetsInitializer.latest(),
+ null)));
+
+DynamicKafkaSource<String> source =
+ DynamicKafkaSource.<String>builder()
+ .setStreamIds(Set.of(stream.getStreamId()))
+ .setKafkaMetadataService(new MockKafkaMetadataService(Set.of(stream)))
+
.setDeserializer(KafkaRecordDeserializationSchema.valueOnly(StringDeserializer.class))
+ // Overridden by per-cluster starting offsets in metadata when present.
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setBounded(OffsetsInitializer.latest())
+ .build();
+```
+{{< /tab >}}
+{{< /tabs >}}
+
### Kafka Stream Subscription
The Dynamic Kafka Source provides 2 ways of subscribing to Kafka stream(s).
* A set of Kafka stream ids. For example:
@@ -108,6 +157,9 @@ for any changes to the Kafka stream(s) and reconciling the
reader tasks to subsc
Kafka metadata returned by the service. For example, in the case of a Kafka
migration, the source would
swap from one cluster to the new cluster when the service makes that change in
the Kafka stream metadata.
+Cluster metadata can optionally carry per-cluster starting and stopping
offsets initializers. These
+override the global builder configuration for the affected cluster.
+
### Additional Properties
There are configuration options in DynamicKafkaSourceOptions that can be
configured in the properties through the builder:
<table class="table table-bordered">
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/ClusterMetadata.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/ClusterMetadata.java
index 964e51e5..8883b010 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/ClusterMetadata.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/ClusterMetadata.java
@@ -19,6 +19,9 @@
package org.apache.flink.connector.kafka.dynamic.metadata;
import org.apache.flink.annotation.Experimental;
+import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
+
+import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.Objects;
@@ -26,13 +29,15 @@ import java.util.Properties;
import java.util.Set;
/**
- * {@link ClusterMetadata} provides readers information about a cluster on
what topics to read and
- * how to connect to a cluster.
+ * {@link ClusterMetadata} provides readers information about a cluster on
what topics to read, how
+ * to connect to a cluster, and optional offsets initializers.
*/
@Experimental
public class ClusterMetadata implements Serializable {
private final Set<String> topics;
private final Properties properties;
+ @Nullable private final OffsetsInitializer startingOffsetsInitializer;
+ @Nullable private final OffsetsInitializer stoppingOffsetsInitializer;
/**
* Constructs the {@link ClusterMetadata} with the required properties.
@@ -41,8 +46,26 @@ public class ClusterMetadata implements Serializable {
* @param properties the properties to access a cluster.
*/
public ClusterMetadata(Set<String> topics, Properties properties) {
+ this(topics, properties, null, null);
+ }
+
+ /**
+ * Constructs the {@link ClusterMetadata} with the required properties and
offsets.
+ *
+ * @param topics the topics belonging to a cluster.
+ * @param properties the properties to access a cluster.
+ * @param startingOffsetsInitializer the starting offsets initializer for
the cluster.
+ * @param stoppingOffsetsInitializer the stopping offsets initializer for
the cluster.
+ */
+ public ClusterMetadata(
+ Set<String> topics,
+ Properties properties,
+ @Nullable OffsetsInitializer startingOffsetsInitializer,
+ @Nullable OffsetsInitializer stoppingOffsetsInitializer) {
this.topics = topics;
this.properties = properties;
+ this.startingOffsetsInitializer = startingOffsetsInitializer;
+ this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
}
/**
@@ -63,9 +86,38 @@ public class ClusterMetadata implements Serializable {
return properties;
}
+ /**
+ * Get the starting offsets initializer for the cluster.
+ *
+ * @return the starting offsets initializer, or null to use the source
default.
+ */
+ @Nullable
+ public OffsetsInitializer getStartingOffsetsInitializer() {
+ return startingOffsetsInitializer;
+ }
+
+ /**
+ * Get the stopping offsets initializer for the cluster.
+ *
+ * @return the stopping offsets initializer, or null to use the source
default.
+ */
+ @Nullable
+ public OffsetsInitializer getStoppingOffsetsInitializer() {
+ return stoppingOffsetsInitializer;
+ }
+
@Override
public String toString() {
- return "ClusterMetadata{" + "topics=" + topics + ", properties=" +
properties + '}';
+ return "ClusterMetadata{"
+ + "topics="
+ + topics
+ + ", properties="
+ + properties
+ + ", startingOffsetsInitializer="
+ + startingOffsetsInitializer
+ + ", stoppingOffsetsInitializer="
+ + stoppingOffsetsInitializer
+ + '}';
}
@Override
@@ -77,11 +129,15 @@ public class ClusterMetadata implements Serializable {
return false;
}
ClusterMetadata that = (ClusterMetadata) o;
- return Objects.equals(topics, that.topics) &&
Objects.equals(properties, that.properties);
+ return Objects.equals(topics, that.topics)
+ && Objects.equals(properties, that.properties)
+ && Objects.equals(startingOffsetsInitializer,
that.startingOffsetsInitializer)
+ && Objects.equals(stoppingOffsetsInitializer,
that.stoppingOffsetsInitializer);
}
@Override
public int hashCode() {
- return Objects.hash(topics, properties);
+ return Objects.hash(
+ topics, properties, startingOffsetsInitializer,
stoppingOffsetsInitializer);
}
}
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/SingleClusterTopicMetadataService.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/SingleClusterTopicMetadataService.java
index 1f2f0fd1..a81c5586 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/SingleClusterTopicMetadataService.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/metadata/SingleClusterTopicMetadataService.java
@@ -21,10 +21,13 @@ package org.apache.flink.connector.kafka.dynamic.metadata;
import org.apache.flink.annotation.Experimental;
import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
+import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.admin.AdminClient;
+import javax.annotation.Nullable;
+
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -43,6 +46,8 @@ public class SingleClusterTopicMetadataService implements
KafkaMetadataService {
private final String kafkaClusterId;
private final Properties properties;
+ @Nullable private final OffsetsInitializer startingOffsetsInitializer;
+ @Nullable private final OffsetsInitializer stoppingOffsetsInitializer;
private transient AdminClient adminClient;
/**
@@ -52,8 +57,26 @@ public class SingleClusterTopicMetadataService implements
KafkaMetadataService {
* @param properties the properties of the Kafka cluster.
*/
public SingleClusterTopicMetadataService(String kafkaClusterId, Properties
properties) {
+ this(kafkaClusterId, properties, null, null);
+ }
+
+ /**
+ * Create a {@link SingleClusterTopicMetadataService} with per-cluster
offsets initializers.
+ *
+ * @param kafkaClusterId the id of the Kafka cluster.
+ * @param properties the properties of the Kafka cluster.
+ * @param startingOffsetsInitializer the starting offsets initializer for
the cluster.
+ * @param stoppingOffsetsInitializer the stopping offsets initializer for
the cluster.
+ */
+ public SingleClusterTopicMetadataService(
+ String kafkaClusterId,
+ Properties properties,
+ @Nullable OffsetsInitializer startingOffsetsInitializer,
+ @Nullable OffsetsInitializer stoppingOffsetsInitializer) {
this.kafkaClusterId = kafkaClusterId;
this.properties = properties;
+ this.startingOffsetsInitializer = startingOffsetsInitializer;
+ this.stoppingOffsetsInitializer = stoppingOffsetsInitializer;
}
/** {@inheritDoc} */
@@ -82,7 +105,11 @@ public class SingleClusterTopicMetadataService implements
KafkaMetadataService {
private KafkaStream createKafkaStream(String topic) {
ClusterMetadata clusterMetadata =
- new ClusterMetadata(Collections.singleton(topic), properties);
+ new ClusterMetadata(
+ Collections.singleton(topic),
+ properties,
+ startingOffsetsInitializer,
+ stoppingOffsetsInitializer);
return new KafkaStream(topic, Collections.singletonMap(kafkaClusterId,
clusterMetadata));
}
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java
index eab37c4e..8e814afc 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceBuilder.java
@@ -105,7 +105,7 @@ public class DynamicKafkaSourceBuilder<T> {
/**
* Set the source in bounded mode and specify what offsets to end at. This
is used for all
- * clusters.
+ * clusters unless overridden by cluster metadata.
*
* @param stoppingOffsetsInitializer the {@link OffsetsInitializer}.
* @return the builder.
@@ -141,7 +141,8 @@ public class DynamicKafkaSourceBuilder<T> {
}
/**
- * Set the starting offsets of the stream. This will be applied to all
clusters.
+ * Set the starting offsets of the stream. This will be applied to all
clusters unless
+ * overridden by cluster metadata.
*
* @param startingOffsetsInitializer the {@link OffsetsInitializer}.
* @return the builder.
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumStateSerializer.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumStateSerializer.java
index b34e536c..b7f0b49a 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumStateSerializer.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumStateSerializer.java
@@ -23,11 +23,15 @@ import
org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
import
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer;
+import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.kafka.clients.CommonClientConfigs;
+import javax.annotation.Nullable;
+
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -45,6 +49,7 @@ public class DynamicKafkaSourceEnumStateSerializer
implements SimpleVersionedSerializer<DynamicKafkaSourceEnumState> {
private static final int VERSION_1 = 1;
+ private static final int VERSION_2 = 2;
private final KafkaSourceEnumStateSerializer
kafkaSourceEnumStateSerializer;
@@ -54,7 +59,7 @@ public class DynamicKafkaSourceEnumStateSerializer
@Override
public int getVersion() {
- return VERSION_1;
+ return VERSION_2;
}
@Override
@@ -63,7 +68,7 @@ public class DynamicKafkaSourceEnumStateSerializer
DataOutputStream out = new DataOutputStream(baos)) {
Set<KafkaStream> kafkaStreams = state.getKafkaStreams();
- serialize(kafkaStreams, out);
+ serializeV2(kafkaStreams, out);
Map<String, KafkaSourceEnumState> clusterEnumeratorStates =
state.getClusterEnumeratorStates();
@@ -91,37 +96,39 @@ public class DynamicKafkaSourceEnumStateSerializer
@Override
public DynamicKafkaSourceEnumState deserialize(int version, byte[]
serialized)
throws IOException {
- if (version == VERSION_1) {
- try (ByteArrayInputStream bais = new
ByteArrayInputStream(serialized);
- DataInputStream in = new DataInputStream(bais)) {
- Set<KafkaStream> kafkaStreams = deserialize(in);
-
- Map<String, KafkaSourceEnumState> clusterEnumeratorStates =
new HashMap<>();
- int kafkaSourceEnumStateSerializerVersion = in.readInt();
-
- int clusterEnumeratorStateMapSize = in.readInt();
- for (int i = 0; i < clusterEnumeratorStateMapSize; i++) {
- String kafkaClusterId = in.readUTF();
- int byteArraySize = in.readInt();
- KafkaSourceEnumState kafkaSourceEnumState =
- kafkaSourceEnumStateSerializer.deserialize(
- kafkaSourceEnumStateSerializerVersion,
- readNBytes(in, byteArraySize));
- clusterEnumeratorStates.put(kafkaClusterId,
kafkaSourceEnumState);
- }
+ if (version != VERSION_1 && version != VERSION_2) {
+ throw new IOException(
+ String.format(
+ "The bytes are serialized with version %d, "
+ + "while this deserializer only supports
version up to %d",
+ version, getVersion()));
+ }
+
+ try (ByteArrayInputStream bais = new ByteArrayInputStream(serialized);
+ DataInputStream in = new DataInputStream(bais)) {
+ Set<KafkaStream> kafkaStreams =
+ version == VERSION_1 ? deserializeV1(in) :
deserializeV2(in);
- return new DynamicKafkaSourceEnumState(kafkaStreams,
clusterEnumeratorStates);
+ Map<String, KafkaSourceEnumState> clusterEnumeratorStates = new
HashMap<>();
+ int kafkaSourceEnumStateSerializerVersion = in.readInt();
+
+ int clusterEnumeratorStateMapSize = in.readInt();
+ for (int i = 0; i < clusterEnumeratorStateMapSize; i++) {
+ String kafkaClusterId = in.readUTF();
+ int byteArraySize = in.readInt();
+ KafkaSourceEnumState kafkaSourceEnumState =
+ kafkaSourceEnumStateSerializer.deserialize(
+ kafkaSourceEnumStateSerializerVersion,
+ readNBytes(in, byteArraySize));
+ clusterEnumeratorStates.put(kafkaClusterId,
kafkaSourceEnumState);
}
- }
- throw new IOException(
- String.format(
- "The bytes are serialized with version %d, "
- + "while this deserializer only supports
version up to %d",
- version, getVersion()));
+ return new DynamicKafkaSourceEnumState(kafkaStreams,
clusterEnumeratorStates);
+ }
}
- private void serialize(Set<KafkaStream> kafkaStreams, DataOutputStream
out) throws IOException {
+ private void serializeV2(Set<KafkaStream> kafkaStreams, DataOutputStream
out)
+ throws IOException {
out.writeInt(kafkaStreams.size());
for (KafkaStream kafkaStream : kafkaStreams) {
out.writeUTF(kafkaStream.getStreamId());
@@ -145,11 +152,13 @@ public class DynamicKafkaSourceEnumStateSerializer
.getProperty(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
"Bootstrap servers must be
specified in properties")));
+
writeOffsetsInitializer(clusterMetadata.getStartingOffsetsInitializer(), out);
+
writeOffsetsInitializer(clusterMetadata.getStoppingOffsetsInitializer(), out);
}
}
}
- private Set<KafkaStream> deserialize(DataInputStream in) throws
IOException {
+ private Set<KafkaStream> deserializeV1(DataInputStream in) throws
IOException {
Set<KafkaStream> kafkaStreams = new HashSet<>();
int numStreams = in.readInt();
@@ -179,6 +188,83 @@ public class DynamicKafkaSourceEnumStateSerializer
return kafkaStreams;
}
+ private Set<KafkaStream> deserializeV2(DataInputStream in) throws
IOException {
+ Set<KafkaStream> kafkaStreams = new HashSet<>();
+ int numStreams = in.readInt();
+ for (int i = 0; i < numStreams; i++) {
+ String streamId = in.readUTF();
+ Map<String, ClusterMetadata> clusterMetadataMap = new HashMap<>();
+ int clusterMetadataMapSize = in.readInt();
+ for (int j = 0; j < clusterMetadataMapSize; j++) {
+ String kafkaClusterId = in.readUTF();
+ int topicsSize = in.readInt();
+ Set<String> topics = new HashSet<>();
+ for (int k = 0; k < topicsSize; k++) {
+ topics.add(in.readUTF());
+ }
+
+ String bootstrapServers = in.readUTF();
+ Properties properties = new Properties();
+ properties.setProperty(
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
+
+ OffsetsInitializer startingOffsetsInitializer =
readOffsetsInitializer(in);
+ OffsetsInitializer stoppingOffsetsInitializer =
readOffsetsInitializer(in);
+
+ clusterMetadataMap.put(
+ kafkaClusterId,
+ new ClusterMetadata(
+ topics,
+ properties,
+ startingOffsetsInitializer,
+ stoppingOffsetsInitializer));
+ }
+
+ kafkaStreams.add(new KafkaStream(streamId, clusterMetadataMap));
+ }
+
+ return kafkaStreams;
+ }
+
+ private static void writeOffsetsInitializer(
+ @Nullable OffsetsInitializer offsetsInitializer, DataOutputStream
out)
+ throws IOException {
+ if (offsetsInitializer == null) {
+ out.writeBoolean(false);
+ return;
+ }
+
+ out.writeBoolean(true);
+ byte[] serializedOffsets =
InstantiationUtil.serializeObject(offsetsInitializer);
+ out.writeInt(serializedOffsets.length);
+ out.write(serializedOffsets);
+ }
+
+ @Nullable
+ private static OffsetsInitializer readOffsetsInitializer(DataInputStream
in)
+ throws IOException {
+ boolean hasOffsetsInitializer = in.readBoolean();
+ if (!hasOffsetsInitializer) {
+ return null;
+ }
+
+ int serializedSize = in.readInt();
+ byte[] serializedOffsets = readNBytes(in, serializedSize);
+ try {
+ return InstantiationUtil.deserializeObject(
+ serializedOffsets, getClassLoaderForOffsets());
+ } catch (ClassNotFoundException e) {
+ throw new IOException("Failed to deserialize OffsetsInitializer",
e);
+ }
+ }
+
+ private static ClassLoader getClassLoaderForOffsets() {
+ ClassLoader classLoader =
Thread.currentThread().getContextClassLoader();
+ return classLoader != null
+ ? classLoader
+ : DynamicKafkaSourceEnumStateSerializer.class.getClassLoader();
+ }
+
private static byte[] readNBytes(DataInputStream in, int size) throws
IOException {
byte[] bytes = new byte[size];
in.readFully(bytes);
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
index 7643e62b..c118b27f 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumerator.java
@@ -41,6 +41,7 @@ import
org.apache.flink.connector.kafka.source.enumerator.subscriber.KafkaSubscr
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import org.apache.flink.util.Preconditions;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.KafkaException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -161,10 +162,21 @@ public class DynamicKafkaSourceEnumerator
this.latestKafkaStreams =
dynamicKafkaSourceEnumState.getKafkaStreams();
Map<String, Properties> clusterProperties = new HashMap<>();
+ Map<String, OffsetsInitializer> clusterStartingOffsets = new
HashMap<>();
+ Map<String, OffsetsInitializer> clusterStoppingOffsets = new
HashMap<>();
for (KafkaStream kafkaStream : latestKafkaStreams) {
for (Entry<String, ClusterMetadata> entry :
kafkaStream.getClusterMetadataMap().entrySet()) {
- clusterProperties.put(entry.getKey(),
entry.getValue().getProperties());
+ ClusterMetadata clusterMetadata = entry.getValue();
+ clusterProperties.put(entry.getKey(),
clusterMetadata.getProperties());
+ if (clusterMetadata.getStartingOffsetsInitializer() != null) {
+ clusterStartingOffsets.put(
+ entry.getKey(),
clusterMetadata.getStartingOffsetsInitializer());
+ }
+ if (clusterMetadata.getStoppingOffsetsInitializer() != null) {
+ clusterStoppingOffsets.put(
+ entry.getKey(),
clusterMetadata.getStoppingOffsetsInitializer());
+ }
}
}
@@ -181,7 +193,9 @@ public class DynamicKafkaSourceEnumerator
clusterEnumState.getKey(),
this.latestClusterTopicsMap.get(clusterEnumState.getKey()),
clusterEnumState.getValue(),
- clusterProperties.get(clusterEnumState.getKey()));
+ clusterProperties.get(clusterEnumState.getKey()),
+ clusterStartingOffsets.get(clusterEnumState.getKey()),
+ clusterStoppingOffsets.get(clusterEnumState.getKey()));
}
}
@@ -238,6 +252,8 @@ public class DynamicKafkaSourceEnumerator
Map<String, Set<String>> newClustersTopicsMap = new HashMap<>();
Map<String, Properties> clusterProperties = new HashMap<>();
+ Map<String, OffsetsInitializer> clusterStartingOffsets = new
HashMap<>();
+ Map<String, OffsetsInitializer> clusterStoppingOffsets = new
HashMap<>();
for (KafkaStream kafkaStream : handledFetchKafkaStreams) {
for (Entry<String, ClusterMetadata> entry :
kafkaStream.getClusterMetadataMap().entrySet()) {
@@ -248,6 +264,14 @@ public class DynamicKafkaSourceEnumerator
.computeIfAbsent(kafkaClusterId, (unused) -> new
HashSet<>())
.addAll(clusterMetadata.getTopics());
clusterProperties.put(kafkaClusterId,
clusterMetadata.getProperties());
+ if (clusterMetadata.getStartingOffsetsInitializer() != null) {
+ clusterStartingOffsets.put(
+ kafkaClusterId,
clusterMetadata.getStartingOffsetsInitializer());
+ }
+ if (clusterMetadata.getStoppingOffsetsInitializer() != null) {
+ clusterStoppingOffsets.put(
+ kafkaClusterId,
clusterMetadata.getStoppingOffsetsInitializer());
+ }
}
}
@@ -308,7 +332,9 @@ public class DynamicKafkaSourceEnumerator
activeClusterTopics.getKey(),
activeClusterTopics.getValue(),
newKafkaSourceEnumState,
- clusterProperties.get(activeClusterTopics.getKey()));
+ clusterProperties.get(activeClusterTopics.getKey()),
+ clusterStartingOffsets.get(activeClusterTopics.getKey()),
+ clusterStoppingOffsets.get(activeClusterTopics.getKey()));
}
startAllEnumerators();
@@ -356,7 +382,18 @@ public class DynamicKafkaSourceEnumerator
String kafkaClusterId,
Set<String> topics,
KafkaSourceEnumState kafkaSourceEnumState,
- Properties fetchedProperties) {
+ Properties fetchedProperties,
+ @Nullable OffsetsInitializer clusterStartingOffsetsInitializer,
+ @Nullable OffsetsInitializer clusterStoppingOffsetsInitializer) {
+ OffsetsInitializer effectiveStartingOffsetsInitializer =
+ clusterStartingOffsetsInitializer != null
+ ? clusterStartingOffsetsInitializer
+ : startingOffsetsInitializer;
+ OffsetsInitializer effectiveStoppingOffsetsInitializer =
+ clusterStoppingOffsetsInitializer != null
+ ? clusterStoppingOffsetsInitializer
+ : stoppingOffsetInitializer;
+
final Runnable signalNoMoreSplitsCallback;
if (Boundedness.BOUNDED.equals(boundedness)) {
signalNoMoreSplitsCallback = this::handleNoMoreSplits;
@@ -375,12 +412,18 @@ public class DynamicKafkaSourceEnumerator
KafkaPropertiesUtil.copyProperties(fetchedProperties, consumerProps);
KafkaPropertiesUtil.copyProperties(properties, consumerProps);
KafkaPropertiesUtil.setClientIdPrefix(consumerProps, kafkaClusterId);
+ consumerProps.setProperty(
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ effectiveStartingOffsetsInitializer
+ .getAutoOffsetResetStrategy()
+ .name()
+ .toLowerCase());
KafkaSourceEnumerator enumerator =
new KafkaSourceEnumerator(
KafkaSubscriber.getTopicListSubscriber(new
ArrayList<>(topics)),
- startingOffsetsInitializer,
- stoppingOffsetInitializer,
+ effectiveStartingOffsetsInitializer,
+ effectiveStoppingOffsetsInitializer,
consumerProps,
context,
boundedness,
@@ -484,7 +527,8 @@ public class DynamicKafkaSourceEnumerator
/**
* Besides for checkpointing, this method is used in the restart sequence
to retain the relevant
* assigned splits so that there is no reader duplicate split assignment.
See {@link
- * #createEnumeratorWithAssignedTopicPartitions(String, Set,
KafkaSourceEnumState, Properties)}}
+ * #createEnumeratorWithAssignedTopicPartitions(String, Set,
KafkaSourceEnumState, Properties,
+ * OffsetsInitializer, OffsetsInitializer)}}
*/
@Override
public DynamicKafkaSourceEnumState snapshotState(long checkpointId) throws
Exception {
diff --git
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
index 38952519..3e6f0534 100644
---
a/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
+++
b/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/dynamic/source/reader/DynamicKafkaSourceReader.java
@@ -36,6 +36,7 @@ import
org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetri
import
org.apache.flink.connector.kafka.dynamic.source.metrics.KafkaClusterMetricGroupManager;
import
org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
import org.apache.flink.connector.kafka.source.KafkaPropertiesUtil;
+import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import
org.apache.flink.connector.kafka.source.metrics.KafkaSourceReaderMetrics;
import org.apache.flink.connector.kafka.source.reader.KafkaRecordEmitter;
import org.apache.flink.connector.kafka.source.reader.KafkaSourceReader;
@@ -49,6 +50,7 @@ import
org.apache.flink.streaming.runtime.io.MultipleFuturesAvailabilityHelper;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.UserCodeClassLoader;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -246,9 +248,20 @@ public class DynamicKafkaSourceReader<T> implements
SourceReader<T, DynamicKafka
clusterMetadataMapEntry.getKey(), (unused) ->
new HashSet<>())
.addAll(clusterMetadataMapEntry.getValue().getTopics());
- newClustersProperties.put(
- clusterMetadataMapEntry.getKey(),
- clusterMetadataMapEntry.getValue().getProperties());
+ Properties clusterProperties = new Properties();
+ KafkaPropertiesUtil.copyProperties(
+ clusterMetadataMapEntry.getValue().getProperties(),
clusterProperties);
+ OffsetsInitializer startingOffsetsInitializer =
+
clusterMetadataMapEntry.getValue().getStartingOffsetsInitializer();
+ if (startingOffsetsInitializer != null) {
+ clusterProperties.setProperty(
+ ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
+ startingOffsetsInitializer
+ .getAutoOffsetResetStrategy()
+ .name()
+ .toLowerCase());
+ }
+ newClustersProperties.put(clusterMetadataMapEntry.getKey(),
clusterProperties);
}
}
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java
index 06c2e591..13cc89a4 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/DynamicKafkaSourceITTest.java
@@ -19,13 +19,24 @@
package org.apache.flink.connector.kafka.dynamic.source;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import org.apache.flink.api.connector.source.mocks.MockSplitEnumeratorContext;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestartStrategyOptions;
import org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
import
org.apache.flink.connector.kafka.dynamic.metadata.SingleClusterTopicMetadataService;
+import
org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumState;
+import
org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumStateSerializer;
+import
org.apache.flink.connector.kafka.dynamic.source.enumerator.DynamicKafkaSourceEnumerator;
+import
org.apache.flink.connector.kafka.dynamic.source.enumerator.subscriber.KafkaStreamSetSubscriber;
+import
org.apache.flink.connector.kafka.dynamic.source.split.DynamicKafkaSourceSplit;
+import
org.apache.flink.connector.kafka.dynamic.source.testutils.DynamicKafkaSourceEnumStateTestUtils;
import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
+import
org.apache.flink.connector.kafka.source.enumerator.initializer.NoStoppingOffsetsInitializer;
import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import
org.apache.flink.connector.kafka.source.reader.deserializer.KafkaRecordDeserializationSchema;
import
org.apache.flink.connector.kafka.testutils.DynamicKafkaSourceExternalContextFactory;
@@ -53,8 +64,10 @@ import org.apache.flink.util.CloseableIterator;
import org.apache.flink.util.TestLogger;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
@@ -250,6 +263,279 @@ public class DynamicKafkaSourceITTest extends TestLogger {
.collect(Collectors.toList()));
}
+ @Test
+ void testPerClusterOffsetsInitializersInUnboundedMode() throws
Throwable {
+ String topic = "test-per-cluster-unbounded-offsets";
+ DynamicKafkaSourceTestHelper.createTopic(0, topic, NUM_PARTITIONS);
+ DynamicKafkaSourceTestHelper.createTopic(1, topic, NUM_PARTITIONS);
+
+ int cluster0Start = 0;
+ int cluster0End =
+ DynamicKafkaSourceTestHelper.produceToKafka(
+ 0, topic, NUM_PARTITIONS, NUM_RECORDS_PER_SPLIT,
cluster0Start);
+ int cluster1Start = cluster0End + 1000;
+ int cluster1InitialEnd =
+ DynamicKafkaSourceTestHelper.produceToKafka(
+ 1, topic, NUM_PARTITIONS, NUM_RECORDS_PER_SPLIT,
cluster1Start);
+ int cluster1ExtraStart = cluster1InitialEnd + 1000;
+ AtomicInteger cluster1ExtraEnd = new AtomicInteger(-1);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+
+ Properties properties = new Properties();
+
properties.setProperty(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
"0");
+ properties.setProperty(
+
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS.key(), "0");
+
+ KafkaStream kafkaStream =
+ new KafkaStream(
+ "test-per-cluster-unbounded-stream",
+ ImmutableMap.of(
+
kafkaClusterTestEnvMetadata0.getKafkaClusterId(),
+ new ClusterMetadata(
+ Collections.singleton(topic),
+
kafkaClusterTestEnvMetadata0.getStandardProperties(),
+ OffsetsInitializer.earliest(),
+ null),
+
kafkaClusterTestEnvMetadata1.getKafkaClusterId(),
+ new ClusterMetadata(
+ Collections.singleton(topic),
+
kafkaClusterTestEnvMetadata1.getStandardProperties(),
+ OffsetsInitializer.latest(),
+ null)));
+
+ MockKafkaMetadataService mockKafkaMetadataService =
+ new
MockKafkaMetadataService(Collections.singleton(kafkaStream));
+
+ DynamicKafkaSource<Integer> dynamicKafkaSource =
+ DynamicKafkaSource.<Integer>builder()
+
.setStreamIds(Collections.singleton(kafkaStream.getStreamId()))
+ .setKafkaMetadataService(mockKafkaMetadataService)
+ .setDeserializer(
+ KafkaRecordDeserializationSchema.valueOnly(
+ IntegerDeserializer.class))
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setProperties(properties)
+ .build();
+
+ DataStreamSource<Integer> stream =
+ env.fromSource(
+ dynamicKafkaSource,
+ WatermarkStrategy.noWatermarks(),
+ "dynamic-kafka-src");
+
+ List<Integer> results = new ArrayList<>();
+ try (CloseableIterator<Integer> iterator =
stream.executeAndCollect()) {
+ CommonTestUtils.waitUtil(
+ () -> {
+ try {
+ results.add(iterator.next());
+ if (cluster1ExtraEnd.get() < 0) {
+ cluster1ExtraEnd.set(
+
DynamicKafkaSourceTestHelper.produceToKafka(
+ 1,
+ topic,
+ NUM_PARTITIONS,
+ NUM_RECORDS_PER_SPLIT,
+ cluster1ExtraStart));
+ }
+ } catch (NoSuchElementException e) {
+ // swallow and wait
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+
+ if (cluster1ExtraEnd.get() < 0) {
+ return false;
+ }
+
+ int expectedCount =
+ (cluster0End - cluster0Start)
+ + (cluster1ExtraEnd.get() -
cluster1ExtraStart);
+ return results.size() == expectedCount;
+ },
+ Duration.ofSeconds(15),
+ "Could not obtain the required records within the
timeout");
+ }
+
+ List<Integer> expectedResults =
+ Stream.concat(
+ IntStream.range(cluster0Start,
cluster0End).boxed(),
+ IntStream.range(cluster1ExtraStart,
cluster1ExtraEnd.get())
+ .boxed())
+ .collect(Collectors.toList());
+
assertThat(results).containsExactlyInAnyOrderElementsOf(expectedResults);
+ }
+
+ @Test
+ void testPerClusterOffsetsInitializersInBoundedMode() throws Throwable
{
+ String topic = "test-per-cluster-offsets-initializers";
+ DynamicKafkaSourceTestHelper.createTopic(0, topic, NUM_PARTITIONS);
+ DynamicKafkaSourceTestHelper.createTopic(1, topic, NUM_PARTITIONS);
+
+ int cluster0Start = 0;
+ int cluster0End =
+ DynamicKafkaSourceTestHelper.produceToKafka(
+ 0, topic, NUM_PARTITIONS, NUM_RECORDS_PER_SPLIT,
cluster0Start);
+ int cluster1Start = cluster0End + 1000;
+ DynamicKafkaSourceTestHelper.produceToKafka(
+ 1, topic, NUM_PARTITIONS, NUM_RECORDS_PER_SPLIT,
cluster1Start);
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(2);
+
+ Properties properties = new Properties();
+
properties.setProperty(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
"0");
+ properties.setProperty(
+
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS.key(), "0");
+
+ Map<TopicPartition, Long> cluster1StoppingOffsets =
+ IntStream.range(0, NUM_PARTITIONS)
+ .boxed()
+ .collect(
+ Collectors.toMap(
+ partition -> new
TopicPartition(topic, partition),
+ partition -> 0L));
+
+ KafkaStream kafkaStream =
+ new KafkaStream(
+ "test-per-cluster-offsets-stream",
+ ImmutableMap.of(
+
kafkaClusterTestEnvMetadata0.getKafkaClusterId(),
+ new ClusterMetadata(
+ Collections.singleton(topic),
+
kafkaClusterTestEnvMetadata0.getStandardProperties(),
+ OffsetsInitializer.earliest(),
+ OffsetsInitializer.latest()),
+
kafkaClusterTestEnvMetadata1.getKafkaClusterId(),
+ new ClusterMetadata(
+ Collections.singleton(topic),
+
kafkaClusterTestEnvMetadata1.getStandardProperties(),
+ OffsetsInitializer.earliest(),
+
OffsetsInitializer.offsets(cluster1StoppingOffsets))));
+
+ MockKafkaMetadataService mockKafkaMetadataService =
+ new
MockKafkaMetadataService(Collections.singleton(kafkaStream));
+
+ DynamicKafkaSource<Integer> dynamicKafkaSource =
+ DynamicKafkaSource.<Integer>builder()
+
.setStreamIds(Collections.singleton(kafkaStream.getStreamId()))
+ .setKafkaMetadataService(mockKafkaMetadataService)
+ .setDeserializer(
+ KafkaRecordDeserializationSchema.valueOnly(
+ IntegerDeserializer.class))
+ .setStartingOffsets(OffsetsInitializer.earliest())
+ .setBounded(OffsetsInitializer.latest())
+ .setProperties(properties)
+ .build();
+
+ DataStreamSource<Integer> stream =
+ env.fromSource(
+ dynamicKafkaSource,
+ WatermarkStrategy.noWatermarks(),
+ "dynamic-kafka-src");
+
+ List<Integer> results = new ArrayList<>();
+ try (CloseableIterator<Integer> iterator =
stream.executeAndCollect()) {
+ while (iterator.hasNext()) {
+ results.add(iterator.next());
+ }
+ }
+
+ assertThat(results)
+ .containsExactlyInAnyOrderElementsOf(
+ IntStream.range(cluster0Start, cluster0End)
+ .boxed()
+ .collect(Collectors.toList()));
+ }
+
+ @Test
+ void testRestoreFromV1EnumeratorState() throws Throwable {
+ String topic = "test-v1-enum-state-restore";
+ DynamicKafkaSourceTestHelper.createTopic(0, topic, NUM_PARTITIONS);
+ DynamicKafkaSourceTestHelper.produceToKafka(
+ 0, topic, NUM_PARTITIONS, NUM_RECORDS_PER_SPLIT, 0);
+
+ String streamId = "test-v1-enum-stream";
+ String clusterId =
kafkaClusterTestEnvMetadata0.getKafkaClusterId();
+ Properties clusterProperties =
kafkaClusterTestEnvMetadata0.getStandardProperties();
+ String bootstrapServers =
kafkaClusterTestEnvMetadata0.getBrokerConnectionStrings();
+ clusterProperties.setProperty(
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,
bootstrapServers);
+
+ byte[] serializedState =
+ DynamicKafkaSourceEnumStateTestUtils.serializeV1State(
+ streamId, clusterId, Collections.singleton(topic),
bootstrapServers);
+ DynamicKafkaSourceEnumState restoredState =
+ new DynamicKafkaSourceEnumStateSerializer().deserialize(1,
serializedState);
+
+ Properties properties = new Properties();
+
properties.setProperty(KafkaSourceOptions.PARTITION_DISCOVERY_INTERVAL_MS.key(),
"0");
+ properties.setProperty(
+
DynamicKafkaSourceOptions.STREAM_METADATA_DISCOVERY_INTERVAL_MS.key(), "0");
+
+ KafkaStream kafkaStream =
+ new KafkaStream(
+ streamId,
+ Collections.singletonMap(
+ clusterId,
+ new ClusterMetadata(
+ Collections.singleton(topic),
clusterProperties)));
+ MockKafkaMetadataService mockKafkaMetadataService =
+ new
MockKafkaMetadataService(Collections.singleton(kafkaStream));
+
+ try (MockSplitEnumeratorContext<DynamicKafkaSourceSplit> context =
+ new MockSplitEnumeratorContext<>(2);
+ DynamicKafkaSourceEnumerator enumerator =
+ new DynamicKafkaSourceEnumerator(
+ new
KafkaStreamSetSubscriber(Collections.singleton(streamId)),
+ mockKafkaMetadataService,
+ context,
+ OffsetsInitializer.earliest(),
+ new NoStoppingOffsetsInitializer(),
+ properties,
+ Boundedness.CONTINUOUS_UNBOUNDED,
+ restoredState)) {
+ enumerator.start();
+ registerReader(context, enumerator, 0);
+ registerReader(context, enumerator, 1);
+ runAllOneTimeCallables(context);
+
+ List<DynamicKafkaSourceSplit> assignedSplits =
+ context.getSplitsAssignmentSequence().stream()
+ .map(SplitsAssignment::assignment)
+ .flatMap(assignments ->
assignments.values().stream())
+ .flatMap(Collection::stream)
+ .collect(Collectors.toList());
+
+ assertThat(assignedSplits).isNotEmpty();
+ assertThat(assignedSplits)
+ .allSatisfy(
+ split ->
+
assertThat(split.getKafkaClusterId()).isEqualTo(clusterId));
+ assertThat(assignedSplits)
+ .allSatisfy(
+ split ->
+ assertThat(
+
split.getKafkaPartitionSplit()
+
.getTopicPartition()
+ .topic())
+ .isEqualTo(topic));
+
+ DynamicKafkaSourceEnumState snapshot =
enumerator.snapshotState(1L);
+ ClusterMetadata snapshotMetadata =
+ snapshot.getKafkaStreams().stream()
+ .filter(stream ->
stream.getStreamId().equals(streamId))
+ .findFirst()
+ .orElseThrow()
+ .getClusterMetadataMap()
+ .get(clusterId);
+
assertThat(snapshotMetadata.getStartingOffsetsInitializer()).isNull();
+
assertThat(snapshotMetadata.getStoppingOffsetsInitializer()).isNull();
+ }
+ }
+
@Test
void testMigrationUsingFileMetadataService() throws Throwable {
// setup topics on two clusters
@@ -795,6 +1081,22 @@ public class DynamicKafkaSourceITTest extends TestLogger {
.collect(Collectors.toSet());
}
+ private void registerReader(
+ MockSplitEnumeratorContext<DynamicKafkaSourceSplit> context,
+ DynamicKafkaSourceEnumerator enumerator,
+ int readerId) {
+ context.registerReader(new ReaderInfo(readerId, "location " +
readerId));
+ enumerator.addReader(readerId);
+ enumerator.handleSourceEvent(readerId, new
GetMetadataUpdateEvent());
+ }
+
+ private void runAllOneTimeCallables(
+ MockSplitEnumeratorContext<DynamicKafkaSourceSplit> context)
throws Throwable {
+ while (!context.getOneTimeCallables().isEmpty()) {
+ context.runNextOneTimeCallable();
+ }
+ }
+
private Set<KafkaStream> getKafkaStreams(
String kafkaClusterId, Properties properties,
Collection<String> topics) {
return topics.stream()
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumStateSerializerTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumStateSerializerTest.java
index 251309bc..1473bbdc 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumStateSerializerTest.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/enumerator/DynamicKafkaSourceEnumStateSerializerTest.java
@@ -20,17 +20,21 @@ package
org.apache.flink.connector.kafka.dynamic.source.enumerator;
import org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
+import
org.apache.flink.connector.kafka.dynamic.source.testutils.DynamicKafkaSourceEnumStateTestUtils;
import org.apache.flink.connector.kafka.source.enumerator.AssignmentStatus;
import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
import
org.apache.flink.connector.kafka.source.enumerator.SplitAndAssignmentStatus;
+import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.connector.kafka.source.split.KafkaPartitionSplit;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
import org.apache.kafka.clients.CommonClientConfigs;
+import org.apache.kafka.clients.consumer.OffsetResetStrategy;
import org.apache.kafka.common.TopicPartition;
import org.junit.jupiter.api.Test;
+import java.util.Collections;
import java.util.Properties;
import java.util.Set;
@@ -56,6 +60,10 @@ public class DynamicKafkaSourceEnumStateSerializerTest {
propertiesForCluster1.setProperty(
CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "cluster1:9092");
+ OffsetsInitializer cluster0StartingOffsetsInitializer =
OffsetsInitializer.earliest();
+ OffsetsInitializer cluster0StoppingOffsetsInitializer =
+
OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST);
+
Set<KafkaStream> kafkaStreams =
ImmutableSet.of(
new KafkaStream(
@@ -64,7 +72,9 @@ public class DynamicKafkaSourceEnumStateSerializerTest {
"cluster0",
new ClusterMetadata(
ImmutableSet.of("topic0",
"topic1"),
- propertiesForCluster0),
+ propertiesForCluster0,
+
cluster0StartingOffsetsInitializer,
+
cluster0StoppingOffsetsInitializer),
"cluster1",
new ClusterMetadata(
ImmutableSet.of("topic2",
"topic3"),
@@ -98,7 +108,7 @@ public class DynamicKafkaSourceEnumStateSerializerTest {
DynamicKafkaSourceEnumState dynamicKafkaSourceEnumStateAfterSerde =
dynamicKafkaSourceEnumStateSerializer.deserialize(
- 1,
+ dynamicKafkaSourceEnumStateSerializer.getVersion(),
dynamicKafkaSourceEnumStateSerializer.serialize(
dynamicKafkaSourceEnumState));
@@ -107,6 +117,92 @@ public class DynamicKafkaSourceEnumStateSerializerTest {
.isEqualTo(dynamicKafkaSourceEnumStateAfterSerde);
}
+ @Test
+ public void testSerdeWithPartialOffsetsInitializers() throws Exception {
+ DynamicKafkaSourceEnumStateSerializer
dynamicKafkaSourceEnumStateSerializer =
+ new DynamicKafkaSourceEnumStateSerializer();
+
+ Properties propertiesForCluster0 = new Properties();
+ propertiesForCluster0.setProperty(
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "cluster0:9092");
+ Properties propertiesForCluster1 = new Properties();
+ propertiesForCluster1.setProperty(
+ CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG, "cluster1:9092");
+
+ OffsetsInitializer startingOffsetsInitializer =
OffsetsInitializer.earliest();
+ OffsetsInitializer stoppingOffsetsInitializer =
OffsetsInitializer.latest();
+
+ Set<KafkaStream> kafkaStreams =
+ ImmutableSet.of(
+ new KafkaStream(
+ "stream0",
+ ImmutableMap.of(
+ "cluster0",
+ new ClusterMetadata(
+ ImmutableSet.of("topic0"),
+ propertiesForCluster0,
+ startingOffsetsInitializer,
+ null),
+ "cluster1",
+ new ClusterMetadata(
+ ImmutableSet.of("topic1"),
+ propertiesForCluster1,
+ null,
+ stoppingOffsetsInitializer))));
+
+ DynamicKafkaSourceEnumState dynamicKafkaSourceEnumState =
+ new DynamicKafkaSourceEnumState(kafkaStreams,
Collections.emptyMap());
+
+ DynamicKafkaSourceEnumState dynamicKafkaSourceEnumStateAfterSerde =
+ dynamicKafkaSourceEnumStateSerializer.deserialize(
+ dynamicKafkaSourceEnumStateSerializer.getVersion(),
+ dynamicKafkaSourceEnumStateSerializer.serialize(
+ dynamicKafkaSourceEnumState));
+
+ KafkaStream kafkaStream =
+
dynamicKafkaSourceEnumStateAfterSerde.getKafkaStreams().iterator().next();
+ ClusterMetadata cluster0Metadata =
kafkaStream.getClusterMetadataMap().get("cluster0");
+
assertThat(cluster0Metadata.getStartingOffsetsInitializer()).isNotNull();
+
assertThat(cluster0Metadata.getStartingOffsetsInitializer().getAutoOffsetResetStrategy())
+ .isEqualTo(OffsetResetStrategy.EARLIEST);
+ assertThat(cluster0Metadata.getStoppingOffsetsInitializer()).isNull();
+
+ ClusterMetadata cluster1Metadata =
kafkaStream.getClusterMetadataMap().get("cluster1");
+ assertThat(cluster1Metadata.getStartingOffsetsInitializer()).isNull();
+
assertThat(cluster1Metadata.getStoppingOffsetsInitializer()).isNotNull();
+
assertThat(cluster1Metadata.getStoppingOffsetsInitializer().getAutoOffsetResetStrategy())
+ .isEqualTo(OffsetResetStrategy.LATEST);
+ }
+
+ @Test
+ public void testDeserializeV1State() throws Exception {
+ DynamicKafkaSourceEnumStateSerializer
dynamicKafkaSourceEnumStateSerializer =
+ new DynamicKafkaSourceEnumStateSerializer();
+
+ byte[] serializedState =
+ DynamicKafkaSourceEnumStateTestUtils.serializeV1State(
+ "stream0",
+ "cluster0",
+ ImmutableSet.of("topic0", "topic1"),
+ "cluster0:9092");
+
+ DynamicKafkaSourceEnumState dynamicKafkaSourceEnumState =
+ dynamicKafkaSourceEnumStateSerializer.deserialize(1,
serializedState);
+
+
assertThat(dynamicKafkaSourceEnumState.getClusterEnumeratorStates()).isEmpty();
+ KafkaStream kafkaStream =
dynamicKafkaSourceEnumState.getKafkaStreams().iterator().next();
+ assertThat(kafkaStream.getStreamId()).isEqualTo("stream0");
+ ClusterMetadata clusterMetadata =
kafkaStream.getClusterMetadataMap().get("cluster0");
+
assertThat(clusterMetadata.getTopics()).containsExactlyInAnyOrder("topic0",
"topic1");
+ assertThat(
+ clusterMetadata
+ .getProperties()
+
.getProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG))
+ .isEqualTo("cluster0:9092");
+ assertThat(clusterMetadata.getStartingOffsetsInitializer()).isNull();
+ assertThat(clusterMetadata.getStoppingOffsetsInitializer()).isNull();
+ }
+
private static SplitAndAssignmentStatus getSplitAssignment(
String topic, int partition, AssignmentStatus assignStatus) {
return new SplitAndAssignmentStatus(
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/metadata/SingleClusterTopicMetadataServiceTest.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/metadata/SingleClusterTopicMetadataServiceTest.java
index 4e1fcf0c..81404c55 100644
---
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/metadata/SingleClusterTopicMetadataServiceTest.java
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/metadata/SingleClusterTopicMetadataServiceTest.java
@@ -22,6 +22,7 @@ import
org.apache.flink.connector.kafka.dynamic.metadata.ClusterMetadata;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaMetadataService;
import org.apache.flink.connector.kafka.dynamic.metadata.KafkaStream;
import
org.apache.flink.connector.kafka.dynamic.metadata.SingleClusterTopicMetadataService;
+import
org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import
org.apache.flink.streaming.connectors.kafka.DynamicKafkaSourceTestHelper;
import org.apache.flink.streaming.connectors.kafka.KafkaTestBase;
@@ -114,4 +115,87 @@ class SingleClusterTopicMetadataServiceTest {
.as("the stream topic cannot be found in kafka and we rethrow")
.hasRootCauseInstanceOf(UnknownTopicOrPartitionException.class);
}
+
+ @Test
+ void describeStreamsIncludesOffsetsInitializers() throws Exception {
+ OffsetsInitializer startingOffsetsInitializer =
OffsetsInitializer.earliest();
+ OffsetsInitializer stoppingOffsetsInitializer =
OffsetsInitializer.latest();
+
+ KafkaMetadataService metadataService =
+ new SingleClusterTopicMetadataService(
+ kafkaClusterTestEnvMetadata0.getKafkaClusterId(),
+ kafkaClusterTestEnvMetadata0.getStandardProperties(),
+ startingOffsetsInitializer,
+ stoppingOffsetsInitializer);
+
+ try {
+ Map<String, KafkaStream> streamMap =
+
metadataService.describeStreams(Collections.singleton(TOPIC0));
+ ClusterMetadata clusterMetadata =
+ streamMap
+ .get(TOPIC0)
+ .getClusterMetadataMap()
+
.get(kafkaClusterTestEnvMetadata0.getKafkaClusterId());
+
+ assertThat(clusterMetadata.getStartingOffsetsInitializer())
+ .isSameAs(startingOffsetsInitializer);
+ assertThat(clusterMetadata.getStoppingOffsetsInitializer())
+ .isSameAs(stoppingOffsetsInitializer);
+ } finally {
+ metadataService.close();
+ }
+ }
+
+ @Test
+ void describeStreamsAllowsNullOffsetsInitializers() throws Exception {
+ assertOffsetsInitializers(null, null);
+ }
+
+ @Test
+ void describeStreamsAllowsStartingOffsetsOnly() throws Exception {
+ assertOffsetsInitializers(OffsetsInitializer.earliest(), null);
+ }
+
+ @Test
+ void describeStreamsAllowsStoppingOffsetsOnly() throws Exception {
+ assertOffsetsInitializers(null, OffsetsInitializer.latest());
+ }
+
+ private void assertOffsetsInitializers(
+ OffsetsInitializer startingOffsetsInitializer,
+ OffsetsInitializer stoppingOffsetsInitializer)
+ throws Exception {
+ KafkaMetadataService metadataService =
+ new SingleClusterTopicMetadataService(
+ kafkaClusterTestEnvMetadata0.getKafkaClusterId(),
+ kafkaClusterTestEnvMetadata0.getStandardProperties(),
+ startingOffsetsInitializer,
+ stoppingOffsetsInitializer);
+
+ try {
+ Map<String, KafkaStream> streamMap =
+
metadataService.describeStreams(Collections.singleton(TOPIC0));
+ ClusterMetadata clusterMetadata =
+ streamMap
+ .get(TOPIC0)
+ .getClusterMetadataMap()
+
.get(kafkaClusterTestEnvMetadata0.getKafkaClusterId());
+
+ if (startingOffsetsInitializer == null) {
+
assertThat(clusterMetadata.getStartingOffsetsInitializer()).isNull();
+ } else {
+ assertThat(clusterMetadata.getStartingOffsetsInitializer())
+ .isSameAs(startingOffsetsInitializer);
+ }
+
+ if (stoppingOffsetsInitializer == null) {
+
assertThat(clusterMetadata.getStoppingOffsetsInitializer()).isNull();
+ } else {
+ assertThat(clusterMetadata.getStoppingOffsetsInitializer())
+ .isSameAs(stoppingOffsetsInitializer);
+ }
+ } finally {
+ metadataService.close();
+ }
+ }
}
diff --git
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/testutils/DynamicKafkaSourceEnumStateTestUtils.java
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/testutils/DynamicKafkaSourceEnumStateTestUtils.java
new file mode 100644
index 00000000..f6038784
--- /dev/null
+++
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/dynamic/source/testutils/DynamicKafkaSourceEnumStateTestUtils.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kafka.dynamic.source.testutils;
+
+import
org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumStateSerializer;
+
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Set;
+
+/** Test utilities for DynamicKafkaSource enum state serialization. */
+public final class DynamicKafkaSourceEnumStateTestUtils {
+ private DynamicKafkaSourceEnumStateTestUtils() {}
+
+ public static byte[] serializeV1State(
+ String streamId, String clusterId, Set<String> topics, String
bootstrapServers)
+ throws IOException {
+ KafkaSourceEnumStateSerializer kafkaSourceEnumStateSerializer =
+ new KafkaSourceEnumStateSerializer();
+ try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ DataOutputStream out = new DataOutputStream(baos)) {
+ out.writeInt(1);
+ out.writeUTF(streamId);
+ out.writeInt(1);
+ out.writeUTF(clusterId);
+ out.writeInt(topics.size());
+ for (String topic : topics) {
+ out.writeUTF(topic);
+ }
+ out.writeUTF(bootstrapServers);
+ out.writeInt(kafkaSourceEnumStateSerializer.getVersion());
+ out.writeInt(0);
+ return baos.toByteArray();
+ }
+ }
+}