This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit b8328ab55e2bcf026ef82e35cebbb1d867cfb18f
Author: Jiabao Sun <jiabao....@xtransfer.cn>
AuthorDate: Thu Feb 8 23:16:44 2024 +0800

    [FLINK-34192] Update to be compatible with updated SinkV2 interfaces
---
 .github/workflows/push_pr.yml                      |   2 +
 flink-connector-kafka/pom.xml                      |   4 +
 .../connector/kafka/sink/KafkaWriterITCase.java    | 149 ++++++++++-----------
 .../kafka/table/KafkaTableTestUtils.java           |  16 ++-
 4 files changed, 91 insertions(+), 80 deletions(-)

diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index d57c0181..00e2f788 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -30,6 +30,8 @@ jobs:
         include:
           - flink: 1.18.1
             jdk: '8, 11, 17'
+          - flink: 1.19-SNAPSHOT
+            jdk: '8, 11, 17, 21'
     uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
     with:
       flink_version: ${{ matrix.flink }}
diff --git a/flink-connector-kafka/pom.xml b/flink-connector-kafka/pom.xml
index 40d6a9f3..6510b9c8 100644
--- a/flink-connector-kafka/pom.xml
+++ b/flink-connector-kafka/pom.xml
@@ -144,6 +144,10 @@ under the License.
                     <groupId>org.slf4j</groupId>
                     <artifactId>slf4j-api</artifactId>
                 </exclusion>
+                <exclusion>
+                    <groupId>io.dropwizard.metrics</groupId>
+                    <artifactId>metrics-core</artifactId>
+                </exclusion>
             </exclusions>
             <scope>test</scope>
         </dependency>
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index 41c26633..c9eceb98 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -27,9 +27,11 @@ import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.metrics.testutils.MetricListener;
 import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.UserCodeClassLoader;
@@ -58,7 +60,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
@@ -84,7 +85,7 @@ public class KafkaWriterITCase {
     private static final Network NETWORK = Network.newNetwork();
     private static final String KAFKA_METRIC_WITH_GROUP_NAME = 
"KafkaProducer.incoming-byte-total";
     private static final SinkWriter.Context SINK_WRITER_CONTEXT = new 
DummySinkWriterContext();
-    private String topic;
+    private static String topic;
 
     private MetricListener metricListener;
     private TriggerTimeService timeService;
@@ -130,11 +131,8 @@ public class KafkaWriterITCase {
 
     @Test
     public void testIncreasingRecordBasedCounters() throws Exception {
-        final OperatorIOMetricGroup operatorIOMetricGroup =
-                
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
-        final InternalSinkWriterMetricGroup metricGroup =
-                InternalSinkWriterMetricGroup.mock(
-                        metricListener.getMetricGroup(), 
operatorIOMetricGroup);
+        final SinkWriterMetricGroup metricGroup = 
createSinkWriterMetricGroup();
+
         try (final KafkaWriter<Integer> writer =
                 createWriterWithConfiguration(
                         getKafkaClientConfiguration(), DeliveryGuarantee.NONE, 
metricGroup)) {
@@ -167,13 +165,9 @@ public class KafkaWriterITCase {
 
     @Test
     public void testCurrentSendTimeMetric() throws Exception {
-        final InternalSinkWriterMetricGroup metricGroup =
-                
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup());
         try (final KafkaWriter<Integer> writer =
                 createWriterWithConfiguration(
-                        getKafkaClientConfiguration(),
-                        DeliveryGuarantee.AT_LEAST_ONCE,
-                        metricGroup)) {
+                        getKafkaClientConfiguration(), 
DeliveryGuarantee.AT_LEAST_ONCE)) {
             final Optional<Gauge<Long>> currentSendTime =
                     metricListener.getGauge("currentSendTime");
             assertThat(currentSendTime.isPresent()).isTrue();
@@ -199,16 +193,12 @@ public class KafkaWriterITCase {
     void testFlushAsyncErrorPropagationAndErrorCounter() throws Exception {
         Properties properties = getKafkaClientConfiguration();
 
-        SinkInitContext sinkInitContext =
-                new SinkInitContext(
-                        
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
-                        timeService,
-                        null);
+        final SinkWriterMetricGroup metricGroup = 
createSinkWriterMetricGroup();
+
         final KafkaWriter<Integer> writer =
                 createWriterWithConfiguration(
-                        properties, DeliveryGuarantee.EXACTLY_ONCE, 
sinkInitContext);
-        final Counter numRecordsOutErrors =
-                sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter();
+                        properties, DeliveryGuarantee.EXACTLY_ONCE, 
metricGroup);
+        final Counter numRecordsOutErrors = 
metricGroup.getNumRecordsOutErrorsCounter();
         assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
 
         triggerProducerException(writer, properties);
@@ -228,16 +218,12 @@ public class KafkaWriterITCase {
     void testWriteAsyncErrorPropagationAndErrorCounter() throws Exception {
         Properties properties = getKafkaClientConfiguration();
 
-        SinkInitContext sinkInitContext =
-                new SinkInitContext(
-                        
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
-                        timeService,
-                        null);
+        final SinkWriterMetricGroup metricGroup = 
createSinkWriterMetricGroup();
+
         final KafkaWriter<Integer> writer =
                 createWriterWithConfiguration(
-                        properties, DeliveryGuarantee.EXACTLY_ONCE, 
sinkInitContext);
-        final Counter numRecordsOutErrors =
-                sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter();
+                        properties, DeliveryGuarantee.EXACTLY_ONCE, 
metricGroup);
+        final Counter numRecordsOutErrors = 
metricGroup.getNumRecordsOutErrorsCounter();
         assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
 
         triggerProducerException(writer, properties);
@@ -259,10 +245,8 @@ public class KafkaWriterITCase {
         Properties properties = getKafkaClientConfiguration();
 
         SinkInitContext sinkInitContext =
-                new SinkInitContext(
-                        
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
-                        timeService,
-                        null);
+                new SinkInitContext(createSinkWriterMetricGroup(), 
timeService, null);
+
         final KafkaWriter<Integer> writer =
                 createWriterWithConfiguration(
                         properties, DeliveryGuarantee.EXACTLY_ONCE, 
sinkInitContext);
@@ -289,16 +273,12 @@ public class KafkaWriterITCase {
     void testCloseAsyncErrorPropagationAndErrorCounter() throws Exception {
         Properties properties = getKafkaClientConfiguration();
 
-        SinkInitContext sinkInitContext =
-                new SinkInitContext(
-                        
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
-                        timeService,
-                        null);
+        final SinkWriterMetricGroup metricGroup = 
createSinkWriterMetricGroup();
+
         final KafkaWriter<Integer> writer =
                 createWriterWithConfiguration(
-                        properties, DeliveryGuarantee.EXACTLY_ONCE, 
sinkInitContext);
-        final Counter numRecordsOutErrors =
-                sinkInitContext.metricGroup.getNumRecordsOutErrorsCounter();
+                        properties, DeliveryGuarantee.EXACTLY_ONCE, 
metricGroup);
+        final Counter numRecordsOutErrors = 
metricGroup.getNumRecordsOutErrorsCounter();
         assertThat(numRecordsOutErrors.getCount()).isEqualTo(0L);
 
         triggerProducerException(writer, properties);
@@ -334,7 +314,7 @@ public class KafkaWriterITCase {
                 createWriterWithConfiguration(
                         getKafkaClientConfiguration(),
                         DeliveryGuarantee.AT_LEAST_ONCE,
-                        
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()),
+                        createSinkWriterMetricGroup(),
                         meta -> metadataList.add(meta.toString()))) {
             List<String> expected = new ArrayList<>();
             for (int i = 0; i < 100; i++) {
@@ -518,17 +498,15 @@ public class KafkaWriterITCase {
     }
 
     private KafkaWriter<Integer> createWriterWithConfiguration(
-            Properties config, DeliveryGuarantee guarantee) {
-        return createWriterWithConfiguration(
-                config,
-                guarantee,
-                
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()));
+            Properties config, DeliveryGuarantee guarantee) throws IOException 
{
+        return createWriterWithConfiguration(config, guarantee, 
createSinkWriterMetricGroup());
     }
 
     private KafkaWriter<Integer> createWriterWithConfiguration(
             Properties config,
             DeliveryGuarantee guarantee,
-            SinkWriterMetricGroup sinkWriterMetricGroup) {
+            SinkWriterMetricGroup sinkWriterMetricGroup)
+            throws IOException {
         return createWriterWithConfiguration(config, guarantee, 
sinkWriterMetricGroup, null);
     }
 
@@ -536,27 +514,37 @@ public class KafkaWriterITCase {
             Properties config,
             DeliveryGuarantee guarantee,
             SinkWriterMetricGroup sinkWriterMetricGroup,
-            @Nullable Consumer<RecordMetadata> metadataConsumer) {
-        return new KafkaWriter<>(
-                guarantee,
-                config,
-                "test-prefix",
-                new SinkInitContext(sinkWriterMetricGroup, timeService, 
metadataConsumer),
-                new DummyRecordSerializer(),
-                new DummySchemaContext(),
-                Collections.emptyList());
+            @Nullable Consumer<RecordMetadata> metadataConsumer)
+            throws IOException {
+        KafkaSink<Integer> kafkaSink =
+                KafkaSink.<Integer>builder()
+                        .setKafkaProducerConfig(config)
+                        .setDeliveryGuarantee(guarantee)
+                        .setTransactionalIdPrefix("test-prefix")
+                        .setRecordSerializer(new DummyRecordSerializer())
+                        .build();
+        return (KafkaWriter<Integer>)
+                kafkaSink.createWriter(
+                        new SinkInitContext(sinkWriterMetricGroup, 
timeService, metadataConsumer));
     }
 
     private KafkaWriter<Integer> createWriterWithConfiguration(
-            Properties config, DeliveryGuarantee guarantee, SinkInitContext 
sinkInitContext) {
-        return new KafkaWriter<>(
-                guarantee,
-                config,
-                "test-prefix",
-                sinkInitContext,
-                new DummyRecordSerializer(),
-                new DummySchemaContext(),
-                Collections.emptyList());
+            Properties config, DeliveryGuarantee guarantee, SinkInitContext 
sinkInitContext)
+            throws IOException {
+        KafkaSink<Integer> kafkaSink =
+                KafkaSink.<Integer>builder()
+                        .setKafkaProducerConfig(config)
+                        .setDeliveryGuarantee(guarantee)
+                        .setTransactionalIdPrefix("test-prefix")
+                        .setRecordSerializer(new DummyRecordSerializer())
+                        .build();
+        return (KafkaWriter<Integer>) kafkaSink.createWriter(sinkInitContext);
+    }
+
+    private SinkWriterMetricGroup createSinkWriterMetricGroup() {
+        DummyOperatorMetricGroup operatorMetricGroup =
+                new DummyOperatorMetricGroup(metricListener.getMetricGroup());
+        return InternalSinkWriterMetricGroup.wrap(operatorMetricGroup);
     }
 
     private static Properties getKafkaClientConfiguration() {
@@ -632,7 +620,7 @@ public class KafkaWriterITCase {
         }
     }
 
-    private class DummyRecordSerializer implements 
KafkaRecordSerializationSchema<Integer> {
+    private static class DummyRecordSerializer implements 
KafkaRecordSerializationSchema<Integer> {
         @Override
         public ProducerRecord<byte[], byte[]> serialize(
                 Integer element, KafkaSinkContext context, Long timestamp) {
@@ -644,28 +632,33 @@ public class KafkaWriterITCase {
         }
     }
 
-    private static class DummySchemaContext implements 
SerializationSchema.InitializationContext {
-
+    private static class DummySinkWriterContext implements SinkWriter.Context {
         @Override
-        public MetricGroup getMetricGroup() {
-            throw new UnsupportedOperationException("Not implemented.");
+        public long currentWatermark() {
+            return 0;
         }
 
         @Override
-        public UserCodeClassLoader getUserCodeClassLoader() {
-            throw new UnsupportedOperationException("Not implemented.");
+        public Long timestamp() {
+            return null;
         }
     }
 
-    private static class DummySinkWriterContext implements SinkWriter.Context {
-        @Override
-        public long currentWatermark() {
-            return 0;
+    private static class DummyOperatorMetricGroup extends 
ProxyMetricGroup<MetricGroup>
+            implements OperatorMetricGroup {
+
+        private final OperatorIOMetricGroup operatorIOMetricGroup;
+
+        public DummyOperatorMetricGroup(MetricGroup parentMetricGroup) {
+            super(parentMetricGroup);
+            this.operatorIOMetricGroup =
+                    
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup()
+                            .getIOMetricGroup();
         }
 
         @Override
-        public Long timestamp() {
-            return null;
+        public OperatorIOMetricGroup getIOMetricGroup() {
+            return operatorIOMetricGroup;
         }
     }
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java
index 793d8da7..e4a5ba62 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/table/KafkaTableTestUtils.java
@@ -41,6 +41,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.HamcrestCondition.matching;
@@ -98,8 +99,11 @@ public class KafkaTableTestUtils {
         Collections.sort(expected);
         CommonTestUtils.waitUtil(
                 () -> {
-                    List<String> actual = 
TestValuesTableFactory.getResults(sinkName);
-                    Collections.sort(actual);
+                    List<String> actual =
+                            
TestValuesTableFactory.getResults(sinkName).stream()
+                                    .map(KafkaTableTestUtils::rowToString)
+                                    .sorted()
+                                    .collect(Collectors.toList());
                     return expected.equals(actual);
                 },
                 timeout,
@@ -124,4 +128,12 @@ public class KafkaTableTestUtils {
                             
matching(TableTestMatchers.deepEqualTo(expectedData.get(key), false)));
         }
     }
+
+    private static String rowToString(Object o) {
+        if (o instanceof Row) {
+            return ((Row) o).toString();
+        } else {
+            return o.toString();
+        }
+    }
 }

Reply via email to