This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit 91b048db64ce500b4716963b4ec7acb59e703370
Author: Arvid Heise <[email protected]>
AuthorDate: Wed Feb 19 21:37:00 2025 +0100

    [FLINK-34554] Adding strategies to table API
---
 .../DynamicKafkaRecordSerializationSchema.java     | 52 ++++++++++++++++++-
 .../kafka/table/KafkaConnectorOptions.java         | 25 +++++++++
 .../connectors/kafka/table/KafkaDynamicSink.java   | 11 +++-
 .../kafka/table/KafkaDynamicTableFactory.java      | 15 ++++--
 .../table/UpsertKafkaDynamicTableFactory.java      |  8 ++-
 .../connector/kafka/sink/KafkaSinkITCase.java      |  3 +-
 .../kafka/table/KafkaDynamicTableFactoryTest.java  | 59 ++++++++++++++++++----
 .../connectors/kafka/table/KafkaTableITCase.java   | 51 +++++++++++++++++++
 .../table/UpsertKafkaDynamicTableFactoryTest.java  | 54 +++++++++++++++++---
 9 files changed, 253 insertions(+), 25 deletions(-)

diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
index 229b08b5..f590cc37 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java
@@ -19,6 +19,15 @@ package org.apache.flink.streaming.connectors.kafka.table;
 
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier;
+import org.apache.flink.connector.kafka.lineage.DefaultTypeDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider;
+import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet;
+import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider;
 import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
 import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
 import org.apache.flink.connector.kafka.sink.KafkaSink;
@@ -27,14 +36,17 @@ import org.apache.flink.table.data.RowData;
 import org.apache.flink.types.RowKind;
 import org.apache.flink.util.Preconditions;
 
+import com.google.common.reflect.TypeToken;
 import org.apache.kafka.clients.producer.ProducerRecord;
 
 import javax.annotation.Nullable;
 
+import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.Set;
 import java.util.regex.Pattern;
 
@@ -42,7 +54,10 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 
 /** SerializationSchema used by {@link KafkaDynamicSink} to configure a {@link 
KafkaSink}. */
 @Internal
-class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationSchema<RowData> {
+class DynamicKafkaRecordSerializationSchema
+        implements KafkaRecordSerializationSchema<RowData>,
+                KafkaDatasetFacetProvider,
+                TypeDatasetFacetProvider {
 
     private final Set<String> topics;
     private final Pattern topicPattern;
@@ -170,6 +185,41 @@ class DynamicKafkaRecordSerializationSchema implements 
KafkaRecordSerializationS
         valueSerialization.open(context);
     }
 
+    @Override
+    public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() {
+        if (topics != null) {
+            return Optional.of(
+                    new DefaultKafkaDatasetFacet(
+                            DefaultKafkaDatasetIdentifier.ofTopics(new 
ArrayList<>(topics))));
+        }
+        if (topicPattern != null) {
+            return Optional.of(
+                    new DefaultKafkaDatasetFacet(
+                            
DefaultKafkaDatasetIdentifier.ofPattern(topicPattern)));
+        }
+        return Optional.empty();
+    }
+
+    @Override
+    public Optional<TypeDatasetFacet> getTypeDatasetFacet() {
+        if (this.valueSerialization instanceof ResultTypeQueryable) {
+            return Optional.of(
+                    new DefaultTypeDatasetFacet(
+                            ((ResultTypeQueryable<?>) 
this.valueSerialization).getProducedType()));
+        } else {
+            // gets type information from serialize method signature
+            TypeToken serializationSchemaType = 
TypeToken.of(valueSerialization.getClass());
+            Class parameterType =
+                    serializationSchemaType
+                            
.resolveType(SerializationSchema.class.getTypeParameters()[0])
+                            .getRawType();
+            if (parameterType != Object.class) {
+                return Optional.of(new 
DefaultTypeDatasetFacet(TypeInformation.of(parameterType)));
+            }
+        }
+        return Optional.empty();
+    }
+
     private String getTargetTopic(RowData element) {
         if (topics != null && topics.size() == 1) {
             // If topics is a singleton list, we only return the provided 
topic.
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
index c64ab0be..0639e812 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java
@@ -25,6 +25,7 @@ import org.apache.flink.configuration.DescribedEnum;
 import org.apache.flink.configuration.description.Description;
 import org.apache.flink.configuration.description.InlineElement;
 import org.apache.flink.connector.base.DeliveryGuarantee;
+import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy;
 import org.apache.flink.table.factories.FactoryUtil;
 
 import java.time.Duration;
@@ -272,6 +273,30 @@ public class KafkaConnectorOptions {
                                     + DeliveryGuarantee.EXACTLY_ONCE
                                     + " this value is used a prefix for the 
identifier of all opened Kafka transactions.");
 
+    /**
+     * The strategy to name transactions. Naming strategy has implications on 
the resource
+     * consumption on the broker because each unique transaction name requires 
the broker to keep
+     * some metadata in memory for 7 days.
+     *
+     * <p>All naming strategies use the format {@code 
transactionalIdPrefix-subtask-offset} where
+     * offset is calculated differently.
+     */
+    public static final ConfigOption<TransactionNamingStrategy> 
TRANSACTION_NAMING_STRATEGY =
+            ConfigOptions.key("sink.transaction-naming-strategy")
+                    .enumType(TransactionNamingStrategy.class)
+                    .defaultValue(TransactionNamingStrategy.DEFAULT)
+                    .withDescription(
+                            Description.builder()
+                                    .text(
+                                            "Advanced option to influence how 
transactions are named.")
+                                    .linebreak()
+                                    .text(
+                                            "INCREMENTING is the strategy used 
in flink-kafka-connector 3.X (DEFAULT). It wastes memory of the Kafka broker 
but works with older Kafka broker versions (Kafka 2.X).")
+                                    .linebreak()
+                                    .text(
+                                            "POOLING is a new strategy 
introduced in flink-kafka-connector 4.X. It is more resource-friendly than 
INCREMENTING but requires Kafka 3.0+. Switching to this strategy requires a 
checkpoint taken with flink-kafka-connector 4.X or a snapshot taken with 
earlier versions.")
+                                    .build());
+
     // 
--------------------------------------------------------------------------------------------
     // Enums
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
index 3b2fa4dc..d6b1902e 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java
@@ -27,6 +27,7 @@ import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
 import org.apache.flink.connector.kafka.sink.KafkaSink;
 import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder;
+import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.table.api.DataTypes;
@@ -139,6 +140,8 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
     /** Parallelism of the physical Kafka producer. * */
     protected final @Nullable Integer parallelism;
 
+    private final TransactionNamingStrategy transactionNamingStrategy;
+
     public KafkaDynamicSink(
             DataType consumedDataType,
             DataType physicalDataType,
@@ -155,7 +158,8 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
             boolean upsertMode,
             SinkBufferFlushMode flushMode,
             @Nullable Integer parallelism,
-            @Nullable String transactionalIdPrefix) {
+            @Nullable String transactionalIdPrefix,
+            TransactionNamingStrategy transactionNamingStrategy) {
         // Format attributes
         this.consumedDataType =
                 checkNotNull(consumedDataType, "Consumed data type must not be 
null.");
@@ -168,6 +172,7 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
         this.valueProjection = checkNotNull(valueProjection, "Value projection 
must not be null.");
         this.keyPrefix = keyPrefix;
         this.transactionalIdPrefix = transactionalIdPrefix;
+        this.transactionNamingStrategy = transactionNamingStrategy;
         // Mutable attributes
         this.metadataKeys = Collections.emptyList();
         // Kafka-specific attributes
@@ -222,6 +227,7 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
                                         hasMetadata(),
                                         getMetadataPositions(physicalChildren),
                                         upsertMode))
+                        
.setTransactionNamingStrategy(transactionNamingStrategy)
                         .build();
         if (flushMode.isEnabled() && upsertMode) {
             return new DataStreamSinkProvider() {
@@ -292,7 +298,8 @@ public class KafkaDynamicSink implements DynamicTableSink, 
SupportsWritingMetada
                         upsertMode,
                         flushMode,
                         parallelism,
-                        transactionalIdPrefix);
+                        transactionalIdPrefix,
+                        transactionNamingStrategy);
         copy.metadataKeys = metadataKeys;
         return copy;
     }
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
index 3826afc4..1446637e 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java
@@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
+import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy;
 import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
 import org.apache.flink.streaming.connectors.kafka.config.BoundedMode;
 import org.apache.flink.streaming.connectors.kafka.config.StartupMode;
@@ -84,6 +85,7 @@ import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTION_NAMING_STRATEGY;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
@@ -154,6 +156,7 @@ public class KafkaDynamicTableFactory
         options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS);
         options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS);
         options.add(SCAN_PARALLELISM);
+        options.add(TRANSACTION_NAMING_STRATEGY);
         return options;
     }
 
@@ -171,7 +174,8 @@ public class KafkaDynamicTableFactory
                         SCAN_PARALLELISM,
                         SINK_PARTITIONER,
                         SINK_PARALLELISM,
-                        TRANSACTIONAL_ID_PREFIX)
+                        TRANSACTIONAL_ID_PREFIX,
+                        TRANSACTION_NAMING_STRATEGY)
                 .collect(Collectors.toSet());
     }
 
@@ -290,7 +294,8 @@ public class KafkaDynamicTableFactory
                 getFlinkKafkaPartitioner(tableOptions, 
context.getClassLoader()).orElse(null),
                 deliveryGuarantee,
                 parallelism,
-                tableOptions.get(TRANSACTIONAL_ID_PREFIX));
+                tableOptions.get(TRANSACTIONAL_ID_PREFIX),
+                tableOptions.get(TRANSACTION_NAMING_STRATEGY));
     }
 
     // 
--------------------------------------------------------------------------------------------
@@ -438,7 +443,8 @@ public class KafkaDynamicTableFactory
             KafkaPartitioner<RowData> partitioner,
             DeliveryGuarantee deliveryGuarantee,
             Integer parallelism,
-            @Nullable String transactionalIdPrefix) {
+            @Nullable String transactionalIdPrefix,
+            TransactionNamingStrategy transactionNamingStrategy) {
         return new KafkaDynamicSink(
                 physicalDataType,
                 physicalDataType,
@@ -455,6 +461,7 @@ public class KafkaDynamicTableFactory
                 false,
                 SinkBufferFlushMode.DISABLED,
                 parallelism,
-                transactionalIdPrefix);
+                transactionalIdPrefix,
+                transactionNamingStrategy);
     }
 }
diff --git 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
index 275aebd3..c421c4ef 100644
--- 
a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
+++ 
b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java
@@ -69,6 +69,7 @@ import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX;
+import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTION_NAMING_STRATEGY;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT;
 import static 
org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX;
@@ -117,12 +118,14 @@ public class UpsertKafkaDynamicTableFactory
         options.add(DELIVERY_GUARANTEE);
         options.add(TRANSACTIONAL_ID_PREFIX);
         options.add(SCAN_PARALLELISM);
+        options.add(TRANSACTION_NAMING_STRATEGY);
         return options;
     }
 
     @Override
     public Set<ConfigOption<?>> forwardOptions() {
-        return Stream.of(DELIVERY_GUARANTEE, 
TRANSACTIONAL_ID_PREFIX).collect(Collectors.toSet());
+        return Stream.of(DELIVERY_GUARANTEE, TRANSACTIONAL_ID_PREFIX, 
TRANSACTION_NAMING_STRATEGY)
+                .collect(Collectors.toSet());
     }
 
     @Override
@@ -227,7 +230,8 @@ public class UpsertKafkaDynamicTableFactory
                 true,
                 flushMode,
                 parallelism,
-                tableOptions.get(TRANSACTIONAL_ID_PREFIX));
+                tableOptions.get(TRANSACTIONAL_ID_PREFIX),
+                tableOptions.get(TRANSACTION_NAMING_STRATEGY));
     }
 
     private Tuple2<int[], int[]> 
createKeyValueProjections(ResolvedCatalogTable catalogTable) {
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
index c91b9bbb..f94a9354 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java
@@ -514,7 +514,8 @@ public class KafkaSinkITCase extends TestLogger {
                     .get()
                     .asInstanceOf(InstanceOfAssertFactories.THROWABLE)
                     .rootCause()
-                    .hasMessageContaining("Attempted to switch back to 
INCREMENTING");
+                    .hasMessageContaining(
+                            "Attempted to switch the transaction naming 
strategy back to INCREMENTING");
         }
     }
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
index 156cda66..4417346c 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
 import org.apache.flink.connector.kafka.sink.KafkaSink;
+import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.connector.kafka.source.KafkaSourceOptions;
 import org.apache.flink.connector.kafka.source.KafkaSourceTestUtils;
@@ -110,7 +111,6 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 /** Tests for {@link KafkaDynamicTableFactory}. */
 @ExtendWith(TestLoggerExtension.class)
 public class KafkaDynamicTableFactoryTest {
-
     private static final String TOPIC = "myTopic";
     private static final String TOPICS = "myTopic-1;myTopic-2;myTopic-3";
     private static final String TOPIC_REGEX = "myTopic-\\d+";
@@ -652,7 +652,8 @@ public class KafkaDynamicTableFactoryTest {
                         new FlinkFixedPartitioner<>(),
                         DeliveryGuarantee.EXACTLY_ONCE,
                         null,
-                        "kafka-sink");
+                        "kafka-sink",
+                        TransactionNamingStrategy.DEFAULT);
         assertThat(actualSink).isEqualTo(expectedSink);
 
         // Test kafka producer.
@@ -697,7 +698,41 @@ public class KafkaDynamicTableFactoryTest {
                             new FlinkFixedPartitioner<>(),
                             
DeliveryGuarantee.valueOf(semantic.toUpperCase().replace("-", "_")),
                             null,
-                            "kafka-sink");
+                            "kafka-sink",
+                            TransactionNamingStrategy.DEFAULT);
+            assertThat(actualSink).isEqualTo(expectedSink);
+        }
+    }
+
+    @Test
+    public void testTableSinkStrategyTranslation() {
+        for (TransactionNamingStrategy namingStrategy : 
TransactionNamingStrategy.values()) {
+            final EncodingFormat<SerializationSchema<RowData>> 
valueEncodingFormat =
+                    new EncodingFormatMock(",");
+            final Map<String, String> modifiedOptions =
+                    getModifiedOptions(
+                            getBasicSinkOptions(),
+                            options -> {
+                                options.put(
+                                        "sink.transaction-naming-strategy", 
namingStrategy.name());
+                            });
+            final DynamicTableSink actualSink = createTableSink(SCHEMA, 
modifiedOptions);
+            final DynamicTableSink expectedSink =
+                    createExpectedSink(
+                            SCHEMA_DATA_TYPE,
+                            null,
+                            valueEncodingFormat,
+                            new int[0],
+                            new int[] {0, 1, 2},
+                            null,
+                            Collections.singletonList(TOPIC),
+                            null,
+                            KAFKA_SINK_PROPERTIES,
+                            new FlinkFixedPartitioner<>(),
+                            DeliveryGuarantee.EXACTLY_ONCE,
+                            null,
+                            "kafka-sink",
+                            namingStrategy);
             assertThat(actualSink).isEqualTo(expectedSink);
         }
     }
@@ -741,7 +776,8 @@ public class KafkaDynamicTableFactoryTest {
                         new FlinkFixedPartitioner<>(),
                         DeliveryGuarantee.EXACTLY_ONCE,
                         null,
-                        "kafka-sink");
+                        "kafka-sink",
+                        TransactionNamingStrategy.DEFAULT);
 
         assertThat(actualSink).isEqualTo(expectedSink);
     }
@@ -770,7 +806,8 @@ public class KafkaDynamicTableFactoryTest {
                         new FlinkFixedPartitioner<>(),
                         DeliveryGuarantee.EXACTLY_ONCE,
                         100,
-                        "kafka-sink");
+                        "kafka-sink",
+                        TransactionNamingStrategy.DEFAULT);
         assertThat(actualSink).isEqualTo(expectedSink);
 
         final DynamicTableSink.SinkRuntimeProvider provider =
@@ -882,7 +919,8 @@ public class KafkaDynamicTableFactoryTest {
                         new FlinkFixedPartitioner<>(),
                         DeliveryGuarantee.EXACTLY_ONCE,
                         null,
-                        "kafka-sink");
+                        "kafka-sink",
+                        TransactionNamingStrategy.DEFAULT);
         assertThat(actualSink).isEqualTo(expectedSink);
         final KafkaDynamicSink actualKafkaSink = (KafkaDynamicSink) actualSink;
         assertThat(actualKafkaSink.listWritableMetadata())
@@ -920,7 +958,8 @@ public class KafkaDynamicTableFactoryTest {
                         new FlinkFixedPartitioner<>(),
                         DeliveryGuarantee.EXACTLY_ONCE,
                         null,
-                        "kafka-sink");
+                        "kafka-sink",
+                        TransactionNamingStrategy.DEFAULT);
         assertThat(actualSink).isEqualTo(expectedSink);
         final KafkaDynamicSink actualKafkaSink = (KafkaDynamicSink) actualSink;
         assertThat(actualKafkaSink.listWritableMetadata())
@@ -1333,7 +1372,8 @@ public class KafkaDynamicTableFactoryTest {
             @Nullable KafkaPartitioner<RowData> partitioner,
             DeliveryGuarantee deliveryGuarantee,
             @Nullable Integer parallelism,
-            String transactionalIdPrefix) {
+            String transactionalIdPrefix,
+            TransactionNamingStrategy transactionNamingStrategy) {
         return new KafkaDynamicSink(
                 physicalDataType,
                 physicalDataType,
@@ -1350,7 +1390,8 @@ public class KafkaDynamicTableFactoryTest {
                 false,
                 SinkBufferFlushMode.DISABLED,
                 parallelism,
-                transactionalIdPrefix);
+                transactionalIdPrefix,
+                transactionNamingStrategy);
     }
 
     /**
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
index bef07128..0a84ecb2 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java
@@ -22,7 +22,9 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.CoreOptions;
 import org.apache.flink.configuration.RestartStrategyOptions;
 import org.apache.flink.configuration.StateRecoveryOptions;
+import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.kafka.sink.KafkaPartitioner;
+import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy;
 import org.apache.flink.core.execution.JobClient;
 import org.apache.flink.core.execution.SavepointFormatType;
 import 
org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException;
@@ -357,6 +359,55 @@ public class KafkaTableITCase extends KafkaTableTestBase {
         cleanupTopic(topic2);
     }
 
+    @Test
+    public void testExactlyOnceSink() throws Exception {
+        // we always use a different topic name for each parameterized topic,
+        // in order to make sure the topic can be created.
+        final String topic = "topics_" + format + "_" + UUID.randomUUID();
+        createTestTopic(topic, 1, 1);
+
+        // ---------- Produce an event time stream into Kafka 
-------------------
+        String bootstraps = getBootstrapServers();
+        tEnv.executeSql(
+                String.format(
+                        "CREATE TABLE sink (\n"
+                                + "  `user_id` INT,\n"
+                                + "  `item_id` INT,\n"
+                                + "  `behavior` STRING\n"
+                                + ") WITH (\n"
+                                + "  'connector' = '%s',\n"
+                                + "  'sink.delivery-guarantee' = '%s',\n"
+                                + "  'sink.transactional-id-prefix' = '%s',\n"
+                                + "  'sink.transaction-naming-strategy' = 
'%s',\n"
+                                + "  'topic' = '%s',\n"
+                                + "  'properties.bootstrap.servers' = '%s',\n"
+                                + "  'scan.startup.mode' = 
'earliest-offset',\n"
+                                + "  'scan.bounded.mode' = 'latest-offset',\n"
+                                + "  %s\n"
+                                + ")\n",
+                        KafkaDynamicTableFactory.IDENTIFIER,
+                        DeliveryGuarantee.EXACTLY_ONCE,
+                        topic, // use topic as transactional id prefix - it's 
unique
+                        TransactionNamingStrategy.POOLING,
+                        topic,
+                        bootstraps,
+                        formatOptions()));
+
+        List<Row> values =
+                Arrays.asList(Row.of(1, 1102, "behavior 1"), Row.of(2, 1103, 
"behavior 2"));
+        tEnv.fromValues(values).insertInto("sink").execute().await();
+
+        // ---------- Consume stream from Kafka -------------------
+        List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from 
sink"));
+        assertThat(results)
+                .containsExactlyInAnyOrder(
+                        Row.of(1, 1102, "behavior 1"), Row.of(2, 1103, 
"behavior 2"));
+
+        // ------------- cleanup -------------------
+
+        cleanupTopic(topic);
+    }
+
     @Test
     public void testKafkaSourceEmptyResultOnDeletedOffsets() throws Exception {
         // we always use a different topic name for each parameterized topic,
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
index d1f13879..5bc42f3b 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.connector.source.Boundedness;
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.connector.base.DeliveryGuarantee;
 import org.apache.flink.connector.kafka.sink.KafkaSink;
+import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy;
 import org.apache.flink.connector.kafka.source.KafkaSource;
 import org.apache.flink.connector.kafka.source.KafkaSourceTestUtils;
 import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState;
@@ -271,7 +272,8 @@ public class UpsertKafkaDynamicTableFactoryTest extends 
TestLogger {
                         DeliveryGuarantee.EXACTLY_ONCE,
                         SinkBufferFlushMode.DISABLED,
                         null,
-                        "kafka-sink");
+                        "kafka-sink",
+                        TransactionNamingStrategy.DEFAULT);
 
         // Test sink format.
         final KafkaDynamicSink actualUpsertKafkaSink = (KafkaDynamicSink) 
actualSink;
@@ -313,7 +315,8 @@ public class UpsertKafkaDynamicTableFactoryTest extends 
TestLogger {
                         DeliveryGuarantee.EXACTLY_ONCE,
                         SinkBufferFlushMode.DISABLED,
                         null,
-                        "kafka-sink");
+                        "kafka-sink",
+                        TransactionNamingStrategy.DEFAULT);
 
         // Test sink format.
         final KafkaDynamicSink actualUpsertKafkaSink = (KafkaDynamicSink) 
actualSink;
@@ -358,7 +361,8 @@ public class UpsertKafkaDynamicTableFactoryTest extends 
TestLogger {
                         DeliveryGuarantee.EXACTLY_ONCE,
                         new SinkBufferFlushMode(100, 1000L),
                         null,
-                        "kafka-sink");
+                        "kafka-sink",
+                        TransactionNamingStrategy.DEFAULT);
 
         // Test sink format.
         final KafkaDynamicSink actualUpsertKafkaSink = (KafkaDynamicSink) 
actualSink;
@@ -413,7 +417,8 @@ public class UpsertKafkaDynamicTableFactoryTest extends 
TestLogger {
                         DeliveryGuarantee.EXACTLY_ONCE,
                         SinkBufferFlushMode.DISABLED,
                         100,
-                        "kafka-sink");
+                        "kafka-sink",
+                        TransactionNamingStrategy.DEFAULT);
         assertThat(actualSink).isEqualTo(expectedSink);
 
         final DynamicTableSink.SinkRuntimeProvider provider =
@@ -424,6 +429,41 @@ public class UpsertKafkaDynamicTableFactoryTest extends 
TestLogger {
         assertThat((long) sinkProvider.getParallelism().get()).isEqualTo(100);
     }
 
+    @Test
+    public void testTableSinkStrategyTranslation() {
+        for (TransactionNamingStrategy namingStrategy : 
TransactionNamingStrategy.values()) {
+            final EncodingFormat<SerializationSchema<RowData>> 
valueEncodingFormat =
+                    new TestFormatFactory.EncodingFormatMock(",");
+            final Map<String, String> modifiedOptions =
+                    getModifiedOptions(
+                            getFullSinkOptions(),
+                            options -> {
+                                options.put("sink.delivery-guarantee", 
"exactly-once");
+                                options.put("sink.transactional-id-prefix", 
"kafka-sink");
+                                options.put(
+                                        "sink.transaction-naming-strategy", 
namingStrategy.name());
+                            });
+            final DynamicTableSink actualSink = createTableSink(SINK_SCHEMA, 
modifiedOptions);
+            final DynamicTableSink expectedSink =
+                    createExpectedSink(
+                            SINK_SCHEMA.toPhysicalRowDataType(),
+                            keyEncodingFormat,
+                            valueEncodingFormat,
+                            SINK_KEY_FIELDS,
+                            SINK_VALUE_FIELDS,
+                            null,
+                            Collections.singletonList(SINK_TOPIC),
+                            null,
+                            UPSERT_KAFKA_SINK_PROPERTIES,
+                            DeliveryGuarantee.EXACTLY_ONCE,
+                            SinkBufferFlushMode.DISABLED,
+                            null,
+                            "kafka-sink",
+                            namingStrategy);
+            assertThat(actualSink).isEqualTo(expectedSink);
+        }
+    }
+
     @Test
     public void testTableSinkAutoCompleteSchemaRegistrySubject() {
         // value.format + key.format
@@ -933,7 +973,8 @@ public class UpsertKafkaDynamicTableFactoryTest extends 
TestLogger {
             DeliveryGuarantee deliveryGuarantee,
             SinkBufferFlushMode flushMode,
             Integer parallelism,
-            String transactionalIdPrefix) {
+            String transactionalIdPrefix,
+            TransactionNamingStrategy transactionNamingStrategy) {
         return new KafkaDynamicSink(
                 consumedDataType,
                 consumedDataType,
@@ -950,7 +991,8 @@ public class UpsertKafkaDynamicTableFactoryTest extends 
TestLogger {
                 true,
                 flushMode,
                 parallelism,
-                transactionalIdPrefix);
+                transactionalIdPrefix,
+                transactionNamingStrategy);
     }
 
     private KafkaSource<?> 
assertKafkaSource(ScanTableSource.ScanRuntimeProvider provider) {


Reply via email to