This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 02567572533 [hotfix][connectors/kafka] Fix spelling of setDeliveryGuarantee method 02567572533 is described below commit 025675725336cd572aa2601be525efd4995e5b84 Author: Alexander Preuß <11444089+alp...@users.noreply.github.com> AuthorDate: Mon May 9 15:19:06 2022 +0200 [hotfix][connectors/kafka] Fix spelling of setDeliveryGuarantee method --- docs/content.zh/docs/connectors/datastream/kafka.md | 2 +- docs/content/docs/connectors/datastream/kafka.md | 2 +- .../flink/connector/kafka/sink/KafkaSinkBuilder.java | 16 +++++++++++++++- .../connectors/kafka/table/KafkaDynamicSink.java | 2 +- .../flink/connector/kafka/sink/KafkaSinkITCase.java | 6 +++--- .../kafka/sink/testutils/KafkaSinkExternalContext.java | 2 +- .../kafka/table/KafkaChangelogTableITCase.java | 2 +- 7 files changed, 23 insertions(+), 9 deletions(-) diff --git a/docs/content.zh/docs/connectors/datastream/kafka.md b/docs/content.zh/docs/connectors/datastream/kafka.md index 39010fbf791..30c15299b9c 100644 --- a/docs/content.zh/docs/connectors/datastream/kafka.md +++ b/docs/content.zh/docs/connectors/datastream/kafka.md @@ -323,7 +323,7 @@ KafkaSink<String> sink = KafkaSink.<String>builder() .setValueSerializationSchema(new SimpleStringSchema()) .build() ) - .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) + .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); stream.sinkTo(sink); diff --git a/docs/content/docs/connectors/datastream/kafka.md b/docs/content/docs/connectors/datastream/kafka.md index 2b62ebd02f2..e630d5ea2f4 100644 --- a/docs/content/docs/connectors/datastream/kafka.md +++ b/docs/content/docs/connectors/datastream/kafka.md @@ -377,7 +377,7 @@ KafkaSink<String> sink = KafkaSink.<String>builder() .setValueSerializationSchema(new SimpleStringSchema()) .build() ) - .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) + .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .build(); stream.sinkTo(sink); diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java index 62e79547c23..809f205cb05 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/KafkaSinkBuilder.java @@ -50,7 +50,7 @@ import static org.apache.flink.util.Preconditions.checkState; * }</pre> * * <p>One can also configure different {@link DeliveryGuarantee} by using {@link - * #setDeliverGuarantee(DeliveryGuarantee)} but keep in mind when using {@link + * #setDeliveryGuarantee(DeliveryGuarantee)} but keep in mind when using {@link * DeliveryGuarantee#EXACTLY_ONCE} one must set the transactionalIdPrefix {@link * #setTransactionalIdPrefix(String)}. * @@ -94,6 +94,20 @@ public class KafkaSinkBuilder<IN> { * @param deliveryGuarantee * @return {@link KafkaSinkBuilder} */ + public KafkaSinkBuilder<IN> setDeliveryGuarantee(DeliveryGuarantee deliveryGuarantee) { + this.deliveryGuarantee = checkNotNull(deliveryGuarantee, "deliveryGuarantee"); + return this; + } + + /** + * Sets the wanted the {@link DeliveryGuarantee}. The default delivery guarantee is {@link + * #deliveryGuarantee}. + * + * @param deliveryGuarantee + * @return {@link KafkaSinkBuilder} + * @deprecated Will be removed in future versions. Use {@link #setDeliveryGuarantee} instead. + */ + @Deprecated public KafkaSinkBuilder<IN> setDeliverGuarantee(DeliveryGuarantee deliveryGuarantee) { this.deliveryGuarantee = checkNotNull(deliveryGuarantee, "deliveryGuarantee"); return this; diff --git a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java index 00bfda3ad38..8af1782ac95 100644 --- a/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java +++ b/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/table/KafkaDynamicSink.java @@ -198,7 +198,7 @@ public class KafkaDynamicSink implements DynamicTableSink, SupportsWritingMetada } final KafkaSink<RowData> kafkaSink = sinkBuilder - .setDeliverGuarantee(deliveryGuarantee) + .setDeliveryGuarantee(deliveryGuarantee) .setBootstrapServers( properties.get(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG).toString()) .setKafkaProducerConfig(properties) diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java index be7a928fb5b..2ae414597a6 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaSinkITCase.java @@ -335,7 +335,7 @@ public class KafkaSinkITCase extends TestLogger { final DataStream<Long> stream = source.map(mapper); final KafkaSinkBuilder<Long> builder = new KafkaSinkBuilder<Long>() - .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) + .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers()) .setRecordSerializer( KafkaRecordSerializationSchema.builder() @@ -365,7 +365,7 @@ public class KafkaSinkITCase extends TestLogger { stream.sinkTo( new KafkaSinkBuilder<Long>() - .setDeliverGuarantee(guarantee) + .setDeliveryGuarantee(guarantee) .setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers()) .setRecordSerializer( KafkaRecordSerializationSchema.builder() @@ -394,7 +394,7 @@ public class KafkaSinkITCase extends TestLogger { source.sinkTo( new KafkaSinkBuilder<Long>() .setBootstrapServers(KAFKA_CONTAINER.getBootstrapServers()) - .setDeliverGuarantee(deliveryGuarantee) + .setDeliveryGuarantee(deliveryGuarantee) .setRecordSerializer( KafkaRecordSerializationSchema.builder() .setTopic(topic) diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java index 7c287d23592..34e4dbff61d 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/testutils/KafkaSinkExternalContext.java @@ -141,7 +141,7 @@ public class KafkaSinkExternalContext implements DataStreamSinkV2ExternalContext properties.put( ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, DEFAULT_TRANSACTION_TIMEOUT_IN_MS); builder.setBootstrapServers(bootstrapServers) - .setDeliverGuarantee(toDeliveryGuarantee(sinkSettings.getCheckpointingMode())) + .setDeliveryGuarantee(toDeliveryGuarantee(sinkSettings.getCheckpointingMode())) .setTransactionalIdPrefix("testingFramework") .setKafkaProducerConfig(properties) .setRecordSerializer( diff --git a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java index 85c4af68e2d..e8bc9e37326 100644 --- a/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java +++ b/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaChangelogTableITCase.java @@ -477,7 +477,7 @@ public class KafkaChangelogTableITCase extends KafkaTableTestBase { .setValueSerializationSchema(serSchema) .setPartitioner(partitioner) .build()) - .setDeliverGuarantee(DeliveryGuarantee.EXACTLY_ONCE) + .setDeliveryGuarantee(DeliveryGuarantee.EXACTLY_ONCE) .build()); env.execute("Write sequence"); }