This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit 91b048db64ce500b4716963b4ec7acb59e703370 Author: Arvid Heise <[email protected]> AuthorDate: Wed Feb 19 21:37:00 2025 +0100 [FLINK-34554] Adding strategies to table API --- .../DynamicKafkaRecordSerializationSchema.java | 52 ++++++++++++++++++- .../kafka/table/KafkaConnectorOptions.java | 25 +++++++++ .../connectors/kafka/table/KafkaDynamicSink.java | 11 +++- .../kafka/table/KafkaDynamicTableFactory.java | 15 ++++-- .../table/UpsertKafkaDynamicTableFactory.java | 8 ++- .../connector/kafka/sink/KafkaSinkITCase.java | 3 +- .../kafka/table/KafkaDynamicTableFactoryTest.java | 59 ++++++++++++++++++---- .../connectors/kafka/table/KafkaTableITCase.java | 51 +++++++++++++++++++ .../table/UpsertKafkaDynamicTableFactoryTest.java | 54 +++++++++++++++++--- 9 files changed, 253 insertions(+), 25 deletions(-) diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java index 229b08b5..f590cc37 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/DynamicKafkaRecordSerializationSchema.java @@ -19,6 +19,15 @@ package org.apache.flink.streaming.connectors.kafka.table; import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.java.typeutils.ResultTypeQueryable; +import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetFacet; +import org.apache.flink.connector.kafka.lineage.DefaultKafkaDatasetIdentifier; +import org.apache.flink.connector.kafka.lineage.DefaultTypeDatasetFacet; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacet; +import org.apache.flink.connector.kafka.lineage.KafkaDatasetFacetProvider; +import org.apache.flink.connector.kafka.lineage.TypeDatasetFacet; +import org.apache.flink.connector.kafka.lineage.TypeDatasetFacetProvider; import org.apache.flink.connector.kafka.sink.KafkaPartitioner; import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema; import org.apache.flink.connector.kafka.sink.KafkaSink; @@ -27,14 +36,17 @@ import org.apache.flink.table.data.RowData; import org.apache.flink.types.RowKind; import org.apache.flink.util.Preconditions; +import com.google.common.reflect.TypeToken; import org.apache.kafka.clients.producer.ProducerRecord; import javax.annotation.Nullable; +import java.util.ArrayList; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.Set; import java.util.regex.Pattern; @@ -42,7 +54,10 @@ import static org.apache.flink.util.Preconditions.checkNotNull; /** SerializationSchema used by {@link KafkaDynamicSink} to configure a {@link KafkaSink}. */ @Internal -class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationSchema<RowData> { +class DynamicKafkaRecordSerializationSchema + implements KafkaRecordSerializationSchema<RowData>, + KafkaDatasetFacetProvider, + TypeDatasetFacetProvider { private final Set<String> topics; private final Pattern topicPattern; @@ -170,6 +185,41 @@ class DynamicKafkaRecordSerializationSchema implements KafkaRecordSerializationS valueSerialization.open(context); } + @Override + public Optional<KafkaDatasetFacet> getKafkaDatasetFacet() { + if (topics != null) { + return Optional.of( + new DefaultKafkaDatasetFacet( + DefaultKafkaDatasetIdentifier.ofTopics(new ArrayList<>(topics)))); + } + if (topicPattern != null) { + return Optional.of( + new DefaultKafkaDatasetFacet( + DefaultKafkaDatasetIdentifier.ofPattern(topicPattern))); + } + return Optional.empty(); + } + + @Override + public Optional<TypeDatasetFacet> getTypeDatasetFacet() { + if (this.valueSerialization instanceof ResultTypeQueryable) { + return Optional.of( + new DefaultTypeDatasetFacet( + ((ResultTypeQueryable<?>) this.valueSerialization).getProducedType())); + } else { + // gets type information from serialize method signature + TypeToken serializationSchemaType = TypeToken.of(valueSerialization.getClass()); + Class parameterType = + serializationSchemaType + .resolveType(SerializationSchema.class.getTypeParameters()[0]) + .getRawType(); + if (parameterType != Object.class) { + return Optional.of(new DefaultTypeDatasetFacet(TypeInformation.of(parameterType))); + } + } + return Optional.empty(); + } + private String getTargetTopic(RowData element) { if (topics != null && topics.size() == 1) { // If topics is a singleton list, we only return the provided topic. diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java index c64ab0be..0639e812 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaConnectorOptions.java @@ -25,6 +25,7 @@ import org.apache.flink.configuration.DescribedEnum; import org.apache.flink.configuration.description.Description; import org.apache.flink.configuration.description.InlineElement; import org.apache.flink.connector.base.DeliveryGuarantee; +import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy; import org.apache.flink.table.factories.FactoryUtil; import java.time.Duration; @@ -272,6 +273,30 @@ public class KafkaConnectorOptions { + DeliveryGuarantee.EXACTLY_ONCE + " this value is used a prefix for the identifier of all opened Kafka transactions."); + /** + * The strategy to name transactions. Naming strategy has implications on the resource + * consumption on the broker because each unique transaction name requires the broker to keep + * some metadata in memory for 7 days. + * + * <p>All naming strategies use the format {@code transactionalIdPrefix-subtask-offset} where + * offset is calculated differently. + */ + public static final ConfigOption<TransactionNamingStrategy> TRANSACTION_NAMING_STRATEGY = + ConfigOptions.key("sink.transaction-naming-strategy") + .enumType(TransactionNamingStrategy.class) + .defaultValue(TransactionNamingStrategy.DEFAULT) + .withDescription( + Description.builder() + .text( + "Advanced option to influence how transactions are named.") + .linebreak() + .text( + "INCREMENTING is the strategy used in flink-kafka-connector 3.X (DEFAULT). It wastes memory of the Kafka broker but works with older Kafka broker versions (Kafka 2.X).") + .linebreak() + .text( + "POOLING is a new strategy introduced in flink-kafka-connector 4.X. It is more resource-friendly than INCREMENTING but requires Kafka 3.0+. Switching to this strategy requires a checkpoint taken with flink-kafka-connector 4.X or a snapshot taken with earlier versions.") + .build()); + // -------------------------------------------------------------------------------------------- // Enums // -------------------------------------------------------------------------------------------- diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java index 3b2fa4dc..d6b1902e 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java @@ -27,6 +27,7 @@ import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaPartitioner; import org.apache.flink.connector.kafka.sink.KafkaSink; import org.apache.flink.connector.kafka.sink.KafkaSinkBuilder; +import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.table.api.DataTypes; @@ -139,6 +140,8 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada /** Parallelism of the physical Kafka producer. * */ protected final @Nullable Integer parallelism; + private final TransactionNamingStrategy transactionNamingStrategy; + public KafkaDynamicSink( DataType consumedDataType, DataType physicalDataType, @@ -155,7 +158,8 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada boolean upsertMode, SinkBufferFlushMode flushMode, @Nullable Integer parallelism, - @Nullable String transactionalIdPrefix) { + @Nullable String transactionalIdPrefix, + TransactionNamingStrategy transactionNamingStrategy) { // Format attributes this.consumedDataType = checkNotNull(consumedDataType, "Consumed data type must not be null."); @@ -168,6 +172,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada this.valueProjection = checkNotNull(valueProjection, "Value projection must not be null."); this.keyPrefix = keyPrefix; this.transactionalIdPrefix = transactionalIdPrefix; + this.transactionNamingStrategy = transactionNamingStrategy; // Mutable attributes this.metadataKeys = Collections.emptyList(); // Kafka-specific attributes @@ -222,6 +227,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada hasMetadata(), getMetadataPositions(physicalChildren), upsertMode)) + .setTransactionNamingStrategy(transactionNamingStrategy) .build(); if (flushMode.isEnabled() && upsertMode) { return new DataStreamSinkProvider() { @@ -292,7 +298,8 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada upsertMode, flushMode, parallelism, - transactionalIdPrefix); + transactionalIdPrefix, + transactionNamingStrategy); copy.metadataKeys = metadataKeys; return copy; } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java index 3826afc4..1446637e 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactory.java @@ -27,6 +27,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaPartitioner; +import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy; import org.apache.flink.connector.kafka.source.KafkaSourceOptions; import org.apache.flink.streaming.connectors.kafka.config.BoundedMode; import org.apache.flink.streaming.connectors.kafka.config.StartupMode; @@ -84,6 +85,7 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTION_NAMING_STRATEGY; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX; @@ -154,6 +156,7 @@ public class KafkaDynamicTableFactory options.add(SCAN_BOUNDED_SPECIFIC_OFFSETS); options.add(SCAN_BOUNDED_TIMESTAMP_MILLIS); options.add(SCAN_PARALLELISM); + options.add(TRANSACTION_NAMING_STRATEGY); return options; } @@ -171,7 +174,8 @@ public class KafkaDynamicTableFactory SCAN_PARALLELISM, SINK_PARTITIONER, SINK_PARALLELISM, - TRANSACTIONAL_ID_PREFIX) + TRANSACTIONAL_ID_PREFIX, + TRANSACTION_NAMING_STRATEGY) .collect(Collectors.toSet()); } @@ -290,7 +294,8 @@ public class KafkaDynamicTableFactory getFlinkKafkaPartitioner(tableOptions, context.getClassLoader()).orElse(null), deliveryGuarantee, parallelism, - tableOptions.get(TRANSACTIONAL_ID_PREFIX)); + tableOptions.get(TRANSACTIONAL_ID_PREFIX), + tableOptions.get(TRANSACTION_NAMING_STRATEGY)); } // -------------------------------------------------------------------------------------------- @@ -438,7 +443,8 @@ public class KafkaDynamicTableFactory KafkaPartitioner<RowData> partitioner, DeliveryGuarantee deliveryGuarantee, Integer parallelism, - @Nullable String transactionalIdPrefix) { + @Nullable String transactionalIdPrefix, + TransactionNamingStrategy transactionNamingStrategy) { return new KafkaDynamicSink( physicalDataType, physicalDataType, @@ -455,6 +461,7 @@ public class KafkaDynamicTableFactory false, SinkBufferFlushMode.DISABLED, parallelism, - transactionalIdPrefix); + transactionalIdPrefix, + transactionNamingStrategy); } } diff --git a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java index 275aebd3..c421c4ef 100644 --- a/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java +++ b/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactory.java @@ -69,6 +69,7 @@ import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOp import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TOPIC_PATTERN; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTIONAL_ID_PREFIX; +import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.TRANSACTION_NAMING_STRATEGY; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FIELDS_INCLUDE; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptions.VALUE_FORMAT; import static org.apache.flink.streaming.connectors.kafka.table.KafkaConnectorOptionsUtil.PROPERTIES_PREFIX; @@ -117,12 +118,14 @@ public class UpsertKafkaDynamicTableFactory options.add(DELIVERY_GUARANTEE); options.add(TRANSACTIONAL_ID_PREFIX); options.add(SCAN_PARALLELISM); + options.add(TRANSACTION_NAMING_STRATEGY); return options; } @Override public Set<ConfigOption<?>> forwardOptions() { - return Stream.of(DELIVERY_GUARANTEE, TRANSACTIONAL_ID_PREFIX).collect(Collectors.toSet()); + return Stream.of(DELIVERY_GUARANTEE, TRANSACTIONAL_ID_PREFIX, TRANSACTION_NAMING_STRATEGY) + .collect(Collectors.toSet()); } @Override @@ -227,7 +230,8 @@ public class UpsertKafkaDynamicTableFactory true, flushMode, parallelism, - tableOptions.get(TRANSACTIONAL_ID_PREFIX)); + tableOptions.get(TRANSACTIONAL_ID_PREFIX), + tableOptions.get(TRANSACTION_NAMING_STRATEGY)); } private Tuple2<int[], int[]> createKeyValueProjections(ResolvedCatalogTable catalogTable) { diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java index c91b9bbb..f94a9354 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java @@ -514,7 +514,8 @@ public class KafkaSinkITCase extends TestLogger { .get() .asInstanceOf(InstanceOfAssertFactories.THROWABLE) .rootCause() - .hasMessageContaining("Attempted to switch back to INCREMENTING"); + .hasMessageContaining( + "Attempted to switch the transaction naming strategy back to INCREMENTING"); } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java index 156cda66..4417346c 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicTableFactoryTest.java @@ -28,6 +28,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaPartitioner; import org.apache.flink.connector.kafka.sink.KafkaSink; +import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSourceOptions; import org.apache.flink.connector.kafka.source.KafkaSourceTestUtils; @@ -110,7 +111,6 @@ import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for {@link KafkaDynamicTableFactory}. */ @ExtendWith(TestLoggerExtension.class) public class KafkaDynamicTableFactoryTest { - private static final String TOPIC = "myTopic"; private static final String TOPICS = "myTopic-1;myTopic-2;myTopic-3"; private static final String TOPIC_REGEX = "myTopic-\\d+"; @@ -652,7 +652,8 @@ public class KafkaDynamicTableFactoryTest { new FlinkFixedPartitioner<>(), DeliveryGuarantee.EXACTLY_ONCE, null, - "kafka-sink"); + "kafka-sink", + TransactionNamingStrategy.DEFAULT); assertThat(actualSink).isEqualTo(expectedSink); // Test kafka producer. @@ -697,7 +698,41 @@ public class KafkaDynamicTableFactoryTest { new FlinkFixedPartitioner<>(), DeliveryGuarantee.valueOf(semantic.toUpperCase().replace("-", "_")), null, - "kafka-sink"); + "kafka-sink", + TransactionNamingStrategy.DEFAULT); + assertThat(actualSink).isEqualTo(expectedSink); + } + } + + @Test + public void testTableSinkStrategyTranslation() { + for (TransactionNamingStrategy namingStrategy : TransactionNamingStrategy.values()) { + final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = + new EncodingFormatMock(","); + final Map<String, String> modifiedOptions = + getModifiedOptions( + getBasicSinkOptions(), + options -> { + options.put( + "sink.transaction-naming-strategy", namingStrategy.name()); + }); + final DynamicTableSink actualSink = createTableSink(SCHEMA, modifiedOptions); + final DynamicTableSink expectedSink = + createExpectedSink( + SCHEMA_DATA_TYPE, + null, + valueEncodingFormat, + new int[0], + new int[] {0, 1, 2}, + null, + Collections.singletonList(TOPIC), + null, + KAFKA_SINK_PROPERTIES, + new FlinkFixedPartitioner<>(), + DeliveryGuarantee.EXACTLY_ONCE, + null, + "kafka-sink", + namingStrategy); assertThat(actualSink).isEqualTo(expectedSink); } } @@ -741,7 +776,8 @@ public class KafkaDynamicTableFactoryTest { new FlinkFixedPartitioner<>(), DeliveryGuarantee.EXACTLY_ONCE, null, - "kafka-sink"); + "kafka-sink", + TransactionNamingStrategy.DEFAULT); assertThat(actualSink).isEqualTo(expectedSink); } @@ -770,7 +806,8 @@ public class KafkaDynamicTableFactoryTest { new FlinkFixedPartitioner<>(), DeliveryGuarantee.EXACTLY_ONCE, 100, - "kafka-sink"); + "kafka-sink", + TransactionNamingStrategy.DEFAULT); assertThat(actualSink).isEqualTo(expectedSink); final DynamicTableSink.SinkRuntimeProvider provider = @@ -882,7 +919,8 @@ public class KafkaDynamicTableFactoryTest { new FlinkFixedPartitioner<>(), DeliveryGuarantee.EXACTLY_ONCE, null, - "kafka-sink"); + "kafka-sink", + TransactionNamingStrategy.DEFAULT); assertThat(actualSink).isEqualTo(expectedSink); final KafkaDynamicSink actualKafkaSink = (KafkaDynamicSink) actualSink; assertThat(actualKafkaSink.listWritableMetadata()) @@ -920,7 +958,8 @@ public class KafkaDynamicTableFactoryTest { new FlinkFixedPartitioner<>(), DeliveryGuarantee.EXACTLY_ONCE, null, - "kafka-sink"); + "kafka-sink", + TransactionNamingStrategy.DEFAULT); assertThat(actualSink).isEqualTo(expectedSink); final KafkaDynamicSink actualKafkaSink = (KafkaDynamicSink) actualSink; assertThat(actualKafkaSink.listWritableMetadata()) @@ -1333,7 +1372,8 @@ public class KafkaDynamicTableFactoryTest { @Nullable KafkaPartitioner<RowData> partitioner, DeliveryGuarantee deliveryGuarantee, @Nullable Integer parallelism, - String transactionalIdPrefix) { + String transactionalIdPrefix, + TransactionNamingStrategy transactionNamingStrategy) { return new KafkaDynamicSink( physicalDataType, physicalDataType, @@ -1350,7 +1390,8 @@ public class KafkaDynamicTableFactoryTest { false, SinkBufferFlushMode.DISABLED, parallelism, - transactionalIdPrefix); + transactionalIdPrefix, + transactionNamingStrategy); } /** diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java index bef07128..0a84ecb2 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableITCase.java @@ -22,7 +22,9 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.configuration.RestartStrategyOptions; import org.apache.flink.configuration.StateRecoveryOptions; +import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaPartitioner; +import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy; import org.apache.flink.core.execution.JobClient; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.runtime.messages.FlinkJobTerminatedWithoutCancellationException; @@ -357,6 +359,55 @@ public class KafkaTableITCase extends KafkaTableTestBase { cleanupTopic(topic2); } + @Test + public void testExactlyOnceSink() throws Exception { + // we always use a different topic name for each parameterized topic, + // in order to make sure the topic can be created. + final String topic = "topics_" + format + "_" + UUID.randomUUID(); + createTestTopic(topic, 1, 1); + + // ---------- Produce an event time stream into Kafka ------------------- + String bootstraps = getBootstrapServers(); + tEnv.executeSql( + String.format( + "CREATE TABLE sink (\n" + + " `user_id` INT,\n" + + " `item_id` INT,\n" + + " `behavior` STRING\n" + + ") WITH (\n" + + " 'connector' = '%s',\n" + + " 'sink.delivery-guarantee' = '%s',\n" + + " 'sink.transactional-id-prefix' = '%s',\n" + + " 'sink.transaction-naming-strategy' = '%s',\n" + + " 'topic' = '%s',\n" + + " 'properties.bootstrap.servers' = '%s',\n" + + " 'scan.startup.mode' = 'earliest-offset',\n" + + " 'scan.bounded.mode' = 'latest-offset',\n" + + " %s\n" + + ")\n", + KafkaDynamicTableFactory.IDENTIFIER, + DeliveryGuarantee.EXACTLY_ONCE, + topic, // use topic as transactional id prefix - it's unique + TransactionNamingStrategy.POOLING, + topic, + bootstraps, + formatOptions())); + + List<Row> values = + Arrays.asList(Row.of(1, 1102, "behavior 1"), Row.of(2, 1103, "behavior 2")); + tEnv.fromValues(values).insertInto("sink").execute().await(); + + // ---------- Consume stream from Kafka ------------------- + List<Row> results = collectAllRows(tEnv.sqlQuery("SELECT * from sink")); + assertThat(results) + .containsExactlyInAnyOrder( + Row.of(1, 1102, "behavior 1"), Row.of(2, 1103, "behavior 2")); + + // ------------- cleanup ------------------- + + cleanupTopic(topic); + } + @Test public void testKafkaSourceEmptyResultOnDeletedOffsets() throws Exception { // we always use a different topic name for each parameterized topic, diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java index d1f13879..5bc42f3b 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/UpsertKafkaDynamicTableFactoryTest.java @@ -25,6 +25,7 @@ import org.apache.flink.api.connector.source.Boundedness; import org.apache.flink.api.dag.Transformation; import org.apache.flink.connector.base.DeliveryGuarantee; import org.apache.flink.connector.kafka.sink.KafkaSink; +import org.apache.flink.connector.kafka.sink.TransactionNamingStrategy; import org.apache.flink.connector.kafka.source.KafkaSource; import org.apache.flink.connector.kafka.source.KafkaSourceTestUtils; import org.apache.flink.connector.kafka.source.enumerator.KafkaSourceEnumState; @@ -271,7 +272,8 @@ public class UpsertKafkaDynamicTableFactoryTest extends TestLogger { DeliveryGuarantee.EXACTLY_ONCE, SinkBufferFlushMode.DISABLED, null, - "kafka-sink"); + "kafka-sink", + TransactionNamingStrategy.DEFAULT); // Test sink format. final KafkaDynamicSink actualUpsertKafkaSink = (KafkaDynamicSink) actualSink; @@ -313,7 +315,8 @@ public class UpsertKafkaDynamicTableFactoryTest extends TestLogger { DeliveryGuarantee.EXACTLY_ONCE, SinkBufferFlushMode.DISABLED, null, - "kafka-sink"); + "kafka-sink", + TransactionNamingStrategy.DEFAULT); // Test sink format. final KafkaDynamicSink actualUpsertKafkaSink = (KafkaDynamicSink) actualSink; @@ -358,7 +361,8 @@ public class UpsertKafkaDynamicTableFactoryTest extends TestLogger { DeliveryGuarantee.EXACTLY_ONCE, new SinkBufferFlushMode(100, 1000L), null, - "kafka-sink"); + "kafka-sink", + TransactionNamingStrategy.DEFAULT); // Test sink format. final KafkaDynamicSink actualUpsertKafkaSink = (KafkaDynamicSink) actualSink; @@ -413,7 +417,8 @@ public class UpsertKafkaDynamicTableFactoryTest extends TestLogger { DeliveryGuarantee.EXACTLY_ONCE, SinkBufferFlushMode.DISABLED, 100, - "kafka-sink"); + "kafka-sink", + TransactionNamingStrategy.DEFAULT); assertThat(actualSink).isEqualTo(expectedSink); final DynamicTableSink.SinkRuntimeProvider provider = @@ -424,6 +429,41 @@ public class UpsertKafkaDynamicTableFactoryTest extends TestLogger { assertThat((long) sinkProvider.getParallelism().get()).isEqualTo(100); } + @Test + public void testTableSinkStrategyTranslation() { + for (TransactionNamingStrategy namingStrategy : TransactionNamingStrategy.values()) { + final EncodingFormat<SerializationSchema<RowData>> valueEncodingFormat = + new TestFormatFactory.EncodingFormatMock(","); + final Map<String, String> modifiedOptions = + getModifiedOptions( + getFullSinkOptions(), + options -> { + options.put("sink.delivery-guarantee", "exactly-once"); + options.put("sink.transactional-id-prefix", "kafka-sink"); + options.put( + "sink.transaction-naming-strategy", namingStrategy.name()); + }); + final DynamicTableSink actualSink = createTableSink(SINK_SCHEMA, modifiedOptions); + final DynamicTableSink expectedSink = + createExpectedSink( + SINK_SCHEMA.toPhysicalRowDataType(), + keyEncodingFormat, + valueEncodingFormat, + SINK_KEY_FIELDS, + SINK_VALUE_FIELDS, + null, + Collections.singletonList(SINK_TOPIC), + null, + UPSERT_KAFKA_SINK_PROPERTIES, + DeliveryGuarantee.EXACTLY_ONCE, + SinkBufferFlushMode.DISABLED, + null, + "kafka-sink", + namingStrategy); + assertThat(actualSink).isEqualTo(expectedSink); + } + } + @Test public void testTableSinkAutoCompleteSchemaRegistrySubject() { // value.format + key.format @@ -933,7 +973,8 @@ public class UpsertKafkaDynamicTableFactoryTest extends TestLogger { DeliveryGuarantee deliveryGuarantee, SinkBufferFlushMode flushMode, Integer parallelism, - String transactionalIdPrefix) { + String transactionalIdPrefix, + TransactionNamingStrategy transactionNamingStrategy) { return new KafkaDynamicSink( consumedDataType, consumedDataType, @@ -950,7 +991,8 @@ public class UpsertKafkaDynamicTableFactoryTest extends TestLogger { true, flushMode, parallelism, - transactionalIdPrefix); + transactionalIdPrefix, + transactionNamingStrategy); } private KafkaSource<?> assertKafkaSource(ScanTableSource.ScanRuntimeProvider provider) {
