(flink-connector-kafka) 01/02: [FLINK-34192] Update to be compatible with updated SinkV2 interfaces

2024-02-13 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch v3.1
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit 186e72a2a71cda9ca1bd9ae45420b64611c10900
Author: Jiabao Sun 
AuthorDate: Thu Feb 8 23:16:44 2024 +0800

[FLINK-34192] Update to be compatible with updated SinkV2 interfaces

(cherry picked from commit b8328ab55e2bcf026ef82e35cebbb1d867cfb18f)
---
 .github/workflows/push_pr.yml  |   2 +
 flink-connector-kafka/pom.xml  |   4 +
 .../connector/kafka/sink/KafkaWriterITCase.java| 149 ++---
 .../kafka/table/KafkaTableTestUtils.java   |  16 ++-
 4 files changed, 91 insertions(+), 80 deletions(-)

diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index d57c0181..00e2f788 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -30,6 +30,8 @@ jobs:
 include:
   - flink: 1.18.1
 jdk: '8, 11, 17'
+  - flink: 1.19-SNAPSHOT
+jdk: '8, 11, 17, 21'
 uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
 with:
   flink_version: ${{ matrix.flink }}
diff --git a/flink-connector-kafka/pom.xml b/flink-connector-kafka/pom.xml
index 40d6a9f3..6510b9c8 100644
--- a/flink-connector-kafka/pom.xml
+++ b/flink-connector-kafka/pom.xml
@@ -144,6 +144,10 @@ under the License.
 org.slf4j
 slf4j-api
 
+
+io.dropwizard.metrics
+metrics-core
+
 
 test
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index 41c26633..c9eceb98 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -27,9 +27,11 @@ import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.metrics.testutils.MetricListener;
 import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.UserCodeClassLoader;
@@ -58,7 +60,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
@@ -84,7 +85,7 @@ public class KafkaWriterITCase {
 private static final Network NETWORK = Network.newNetwork();
 private static final String KAFKA_METRIC_WITH_GROUP_NAME = 
"KafkaProducer.incoming-byte-total";
 private static final SinkWriter.Context SINK_WRITER_CONTEXT = new 
DummySinkWriterContext();
-private String topic;
+private static String topic;
 
 private MetricListener metricListener;
 private TriggerTimeService timeService;
@@ -130,11 +131,8 @@ public class KafkaWriterITCase {
 
 @Test
 public void testIncreasingRecordBasedCounters() throws Exception {
-final OperatorIOMetricGroup operatorIOMetricGroup =
-
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
-final InternalSinkWriterMetricGroup metricGroup =
-InternalSinkWriterMetricGroup.mock(
-metricListener.getMetricGroup(), 
operatorIOMetricGroup);
+final SinkWriterMetricGroup metricGroup = 
createSinkWriterMetricGroup();
+
 try (final KafkaWriter writer =
 createWriterWithConfiguration(
 getKafkaClientConfiguration(), DeliveryGuarantee.NONE, 
metricGroup)) {
@@ -167,13 +165,9 @@ public class KafkaWriterITCase {
 
 @Test
 public void testCurrentSendTimeMetric() throws Exception {
-final InternalSinkWriterMetricGroup metricGroup =
-
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup());
 try (final KafkaWriter writer =
 createWriterWithConfiguration(
-getKafkaClientConfiguration(),
-DeliveryGuarantee.AT_LEAST_ONCE,
-metricGroup)) {
+getKafkaClientConfiguration(), 
DeliveryGuarantee.AT_LEAST_ONCE)) {
 final Optional> 

(flink-connector-kafka) 01/02: [FLINK-34192] Update to be compatible with updated SinkV2 interfaces

2024-02-12 Thread martijnvisser
This is an automated email from the ASF dual-hosted git repository.

martijnvisser pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git

commit b8328ab55e2bcf026ef82e35cebbb1d867cfb18f
Author: Jiabao Sun 
AuthorDate: Thu Feb 8 23:16:44 2024 +0800

[FLINK-34192] Update to be compatible with updated SinkV2 interfaces
---
 .github/workflows/push_pr.yml  |   2 +
 flink-connector-kafka/pom.xml  |   4 +
 .../connector/kafka/sink/KafkaWriterITCase.java| 149 ++---
 .../kafka/table/KafkaTableTestUtils.java   |  16 ++-
 4 files changed, 91 insertions(+), 80 deletions(-)

diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml
index d57c0181..00e2f788 100644
--- a/.github/workflows/push_pr.yml
+++ b/.github/workflows/push_pr.yml
@@ -30,6 +30,8 @@ jobs:
 include:
   - flink: 1.18.1
 jdk: '8, 11, 17'
+  - flink: 1.19-SNAPSHOT
+jdk: '8, 11, 17, 21'
 uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils
 with:
   flink_version: ${{ matrix.flink }}
diff --git a/flink-connector-kafka/pom.xml b/flink-connector-kafka/pom.xml
index 40d6a9f3..6510b9c8 100644
--- a/flink-connector-kafka/pom.xml
+++ b/flink-connector-kafka/pom.xml
@@ -144,6 +144,10 @@ under the License.
 org.slf4j
 slf4j-api
 
+
+io.dropwizard.metrics
+metrics-core
+
 
 test
 
diff --git 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
index 41c26633..c9eceb98 100644
--- 
a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
+++ 
b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java
@@ -27,9 +27,11 @@ import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.metrics.groups.OperatorIOMetricGroup;
+import org.apache.flink.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
 import org.apache.flink.metrics.testutils.MetricListener;
 import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup;
+import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
 import org.apache.flink.util.TestLoggerExtension;
 import org.apache.flink.util.UserCodeClassLoader;
@@ -58,7 +60,6 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Optional;
@@ -84,7 +85,7 @@ public class KafkaWriterITCase {
 private static final Network NETWORK = Network.newNetwork();
 private static final String KAFKA_METRIC_WITH_GROUP_NAME = 
"KafkaProducer.incoming-byte-total";
 private static final SinkWriter.Context SINK_WRITER_CONTEXT = new 
DummySinkWriterContext();
-private String topic;
+private static String topic;
 
 private MetricListener metricListener;
 private TriggerTimeService timeService;
@@ -130,11 +131,8 @@ public class KafkaWriterITCase {
 
 @Test
 public void testIncreasingRecordBasedCounters() throws Exception {
-final OperatorIOMetricGroup operatorIOMetricGroup =
-
UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup();
-final InternalSinkWriterMetricGroup metricGroup =
-InternalSinkWriterMetricGroup.mock(
-metricListener.getMetricGroup(), 
operatorIOMetricGroup);
+final SinkWriterMetricGroup metricGroup = 
createSinkWriterMetricGroup();
+
 try (final KafkaWriter writer =
 createWriterWithConfiguration(
 getKafkaClientConfiguration(), DeliveryGuarantee.NONE, 
metricGroup)) {
@@ -167,13 +165,9 @@ public class KafkaWriterITCase {
 
 @Test
 public void testCurrentSendTimeMetric() throws Exception {
-final InternalSinkWriterMetricGroup metricGroup =
-
InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup());
 try (final KafkaWriter writer =
 createWriterWithConfiguration(
-getKafkaClientConfiguration(),
-DeliveryGuarantee.AT_LEAST_ONCE,
-metricGroup)) {
+getKafkaClientConfiguration(), 
DeliveryGuarantee.AT_LEAST_ONCE)) {
 final Optional> currentSendTime =