This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git
commit b8328ab55e2bcf026ef82e35cebbb1d867cfb18f Author: Jiabao Sun <jiabao....@xtransfer.cn> AuthorDate: Thu Feb 8 23:16:44 2024 +0800 [FLINK-34192] Update to be compatible with updated SinkV2 interfaces --- .github/workflows/push_pr.yml | 2 + flink-connector-kafka/pom.xml | 4 + .../connector/kafka/sink/KafkaWriterITCase.java | 149 ++++++++++----------- .../kafka/table/KafkaTableTestUtils.java | 16 ++- 4 files changed, 91 insertions(+), 80 deletions(-) diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index d57c0181..00e2f788 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -30,6 +30,8 @@ jobs: include: - flink: 1.18.1 jdk: '8, 11, 17' + - flink: 1.19-SNAPSHOT + jdk: '8, 11, 17, 21' uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: flink_version: ${{ matrix.flink }} diff --git a/flink-connector-kafka/pom.xml b/flink-connector-kafka/pom.xml index 40d6a9f3..6510b9c8 100644 --- a/flink-connector-kafka/pom.xml +++ b/flink-connector-kafka/pom.xml @@ -144,6 +144,10 @@ under the License. <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> </exclusion> + <exclusion> + <groupId>io.dropwizard.metrics</groupId> + <artifactId>metrics-core</artifactId> + </exclusion> </exclusions> <scope>test</scope> </dependency> diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java index 41c26633..c9eceb98 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java @@ -27,9 +27,11 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.OperatorIOMetricGroup; +import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.metrics.testutils.MetricListener; import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; +import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.util.TestLoggerExtension; import org.apache.flink.util.UserCodeClassLoader; @@ -58,7 +60,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Optional; @@ -84,7 +85,7 @@ public class KafkaWriterITCase { private static final Network NETWORK = Network.newNetwork(); private static final String KAFKA_METRIC_WITH_GROUP_NAME = "KafkaProducer.incoming-byte-total"; private static final SinkWriter.Context SINK_WRITER_CONTEXT = new DummySinkWriterContext(); - private String topic; + private static String topic; private MetricListener metricListener; private TriggerTimeService timeService; @@ -130,11 +131,8 @@ public class KafkaWriterITCase { @Test public void testIncreasingRecordBasedCounters() throws Exception { - final OperatorIOMetricGroup operatorIOMetricGroup = - UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup(); - final InternalSinkWriterMetricGroup metricGroup = - InternalSinkWriterMetricGroup.mock( - metricListener.getMetricGroup(), operatorIOMetricGroup); + final SinkWriterMetricGroup metricGroup = createSinkWriterMetricGroup(); + try (final KafkaWriter<Integer> writer = createWriterWithConfiguration( getKafkaClientConfiguration(), DeliveryGuarantee.NONE, metricGroup)) { @@ -167,13 +165,9 @@ public class KafkaWriterITCase { @Test public void testCurrentSendTimeMetric() throws Exception { - final InternalSinkWriterMetricGroup metricGroup = - InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()); try (final KafkaWriter<Integer> writer = createWriterWithConfiguration( - getKafkaClientConfiguration(), - DeliveryGuarantee.AT_LEAST_ONCE, - metricGroup)) { + getKafkaClientConfiguration(), DeliveryGuarantee.AT_LEAST_ONCE)) { final Optional<Gauge<Long>> currentSendTime = metricListener.getGauge("currentSendTime"); assertThat(currentSendTime.isPresent()).isTrue(); @@ -199,16 +193,12 @@ public class KafkaWriterITCase { void testFlushAsyncErrorPropagationAndErrorCounter() throws Exception { Properties properties = getKafkaClientConfiguration(); - SinkInitContext sinkInitContext = - new SinkInitContext( - InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), - timeService, - null); + final SinkWriterMetricGroup metricGroup = createSinkWriterMetricGroup(); + final KafkaWriter<Integer> writer = createWriterWithConfiguration( - properties, DeliveryGuarantee.EXACTLY_ONCE, sinkInitContext); - final Counter numRecordsOutErrors = - sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter(); + properties, DeliveryGuarantee.EXACTLY_ONCE, metricGroup); + final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter(); assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); triggerProducerException(writer, properties); @@ -228,16 +218,12 @@ public class KafkaWriterITCase { void testWriteAsyncErrorPropagationAndErrorCounter() throws Exception { Properties properties = getKafkaClientConfiguration(); - SinkInitContext sinkInitContext = - new SinkInitContext( - InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), - timeService, - null); + final SinkWriterMetricGroup metricGroup = createSinkWriterMetricGroup(); + final KafkaWriter<Integer> writer = createWriterWithConfiguration( - properties, DeliveryGuarantee.EXACTLY_ONCE, sinkInitContext); - final Counter numRecordsOutErrors = - sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter(); + properties, DeliveryGuarantee.EXACTLY_ONCE, metricGroup); + final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter(); assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); triggerProducerException(writer, properties); @@ -259,10 +245,8 @@ public class KafkaWriterITCase { Properties properties = getKafkaClientConfiguration(); SinkInitContext sinkInitContext = - new SinkInitContext( - InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), - timeService, - null); + new SinkInitContext(createSinkWriterMetricGroup(), timeService, null); + final KafkaWriter<Integer> writer = createWriterWithConfiguration( properties, DeliveryGuarantee.EXACTLY_ONCE, sinkInitContext); @@ -289,16 +273,12 @@ public class KafkaWriterITCase { void testCloseAsyncErrorPropagationAndErrorCounter() throws Exception { Properties properties = getKafkaClientConfiguration(); - SinkInitContext sinkInitContext = - new SinkInitContext( - InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), - timeService, - null); + final SinkWriterMetricGroup metricGroup = createSinkWriterMetricGroup(); + final KafkaWriter<Integer> writer = createWriterWithConfiguration( - properties, DeliveryGuarantee.EXACTLY_ONCE, sinkInitContext); - final Counter numRecordsOutErrors = - sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter(); + properties, DeliveryGuarantee.EXACTLY_ONCE, metricGroup); + final Counter numRecordsOutErrors = metricGroup.getNumRecordsOutErrorsCounter(); assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L); triggerProducerException(writer, properties); @@ -334,7 +314,7 @@ public class KafkaWriterITCase { createWriterWithConfiguration( getKafkaClientConfiguration(), DeliveryGuarantee.AT_LEAST_ONCE, - InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()), + createSinkWriterMetricGroup(), meta -> metadataList.add(meta.toString()))) { List<String> expected = new ArrayList<>(); for (int i = 0; i < 100; i++) { @@ -518,17 +498,15 @@ public class KafkaWriterITCase { } private KafkaWriter<Integer> createWriterWithConfiguration( - Properties config, DeliveryGuarantee guarantee) { - return createWriterWithConfiguration( - config, - guarantee, - InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup())); + Properties config, DeliveryGuarantee guarantee) throws IOException { + return createWriterWithConfiguration(config, guarantee, createSinkWriterMetricGroup()); } private KafkaWriter<Integer> createWriterWithConfiguration( Properties config, DeliveryGuarantee guarantee, - SinkWriterMetricGroup sinkWriterMetricGroup) { + SinkWriterMetricGroup sinkWriterMetricGroup) + throws IOException { return createWriterWithConfiguration(config, guarantee, sinkWriterMetricGroup, null); } @@ -536,27 +514,37 @@ public class KafkaWriterITCase { Properties config, DeliveryGuarantee guarantee, SinkWriterMetricGroup sinkWriterMetricGroup, - @Nullable Consumer<RecordMetadata> metadataConsumer) { - return new KafkaWriter<>( - guarantee, - config, - "test-prefix", - new SinkInitContext(sinkWriterMetricGroup, timeService, metadataConsumer), - new DummyRecordSerializer(), - new DummySchemaContext(), - Collections.emptyList()); + @Nullable Consumer<RecordMetadata> metadataConsumer) + throws IOException { + KafkaSink<Integer> kafkaSink = + KafkaSink.<Integer>builder() + .setKafkaProducerConfig(config) + .setDeliveryGuarantee(guarantee) + .setTransactionalIdPrefix("test-prefix") + .setRecordSerializer(new DummyRecordSerializer()) + .build(); + return (KafkaWriter<Integer>) + kafkaSink.createWriter( + new SinkInitContext(sinkWriterMetricGroup, timeService, metadataConsumer)); } private KafkaWriter<Integer> createWriterWithConfiguration( - Properties config, DeliveryGuarantee guarantee, SinkInitContext sinkInitContext) { - return new KafkaWriter<>( - guarantee, - config, - "test-prefix", - sinkInitContext, - new DummyRecordSerializer(), - new DummySchemaContext(), - Collections.emptyList()); + Properties config, DeliveryGuarantee guarantee, SinkInitContext sinkInitContext) + throws IOException { + KafkaSink<Integer> kafkaSink = + KafkaSink.<Integer>builder() + .setKafkaProducerConfig(config) + .setDeliveryGuarantee(guarantee) + .setTransactionalIdPrefix("test-prefix") + .setRecordSerializer(new DummyRecordSerializer()) + .build(); + return (KafkaWriter<Integer>) kafkaSink.createWriter(sinkInitContext); + } + + private SinkWriterMetricGroup createSinkWriterMetricGroup() { + DummyOperatorMetricGroup operatorMetricGroup = + new DummyOperatorMetricGroup(metricListener.getMetricGroup()); + return InternalSinkWriterMetricGroup.wrap(operatorMetricGroup); } private static Properties getKafkaClientConfiguration() { @@ -632,7 +620,7 @@ public class KafkaWriterITCase { } } - private class DummyRecordSerializer implements KafkaRecordSerializationSchema<Integer> { + private static class DummyRecordSerializer implements KafkaRecordSerializationSchema<Integer> { @Override public ProducerRecord<byte[], byte[]> serialize( Integer element, KafkaSinkContext context, Long timestamp) { @@ -644,28 +632,33 @@ public class KafkaWriterITCase { } } - private static class DummySchemaContext implements SerializationSchema.InitializationContext { - + private static class DummySinkWriterContext implements SinkWriter.Context { @Override - public MetricGroup getMetricGroup() { - throw new UnsupportedOperationException("Not implemented."); + public long currentWatermark() { + return 0; } @Override - public UserCodeClassLoader getUserCodeClassLoader() { - throw new UnsupportedOperationException("Not implemented."); + public Long timestamp() { + return null; } } - private static class DummySinkWriterContext implements SinkWriter.Context { - @Override - public long currentWatermark() { - return 0; + private static class DummyOperatorMetricGroup extends ProxyMetricGroup<MetricGroup> + implements OperatorMetricGroup { + + private final OperatorIOMetricGroup operatorIOMetricGroup; + + public DummyOperatorMetricGroup(MetricGroup parentMetricGroup) { + super(parentMetricGroup); + this.operatorIOMetricGroup = + UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup() + .getIOMetricGroup(); } @Override - public Long timestamp() { - return null; + public OperatorIOMetricGroup getIOMetricGroup() { + return operatorIOMetricGroup; } } diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java index 793d8da7..e4a5ba62 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java @@ -41,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.stream.Collectors; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.HamcrestCondition.matching; @@ -98,8 +99,11 @@ public class KafkaTableTestUtils { Collections.sort(expected); CommonTestUtils.waitUtil( () -> { - List<String> actual = TestValuesTableFactory.getResults(sinkName); - Collections.sort(actual); + List<String> actual = + TestValuesTableFactory.getResults(sinkName).stream() + .map(KafkaTableTestUtils::rowToString) + .sorted() + .collect(Collectors.toList()); return expected.equals(actual); }, timeout, @@ -124,4 +128,12 @@ public class KafkaTableTestUtils { matching(TableTestMatchers.deepEqualTo(expectedData.get(key), false))); } } + + private static String rowToString(Object o) { + if (o instanceof Row) { + return ((Row) o).toString(); + } else { + return o.toString(); + } + } }