(flink-connector-kafka) 01/02: [FLINK-34192] Update to be compatible with updated SinkV2 interfaces
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch v3.1 in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git commit 186e72a2a71cda9ca1bd9ae45420b64611c10900 Author: Jiabao Sun AuthorDate: Thu Feb 8 23:16:44 2024 +0800 [FLINK-34192] Update to be compatible with updated SinkV2 interfaces (cherry picked from commit b8328ab55e2bcf026ef82e35cebbb1d867cfb18f) --- .github/workflows/push_pr.yml | 2 + flink-connector-kafka/pom.xml | 4 + .../connector/kafka/sink/KafkaWriterITCase.java| 149 ++--- .../kafka/table/KafkaTableTestUtils.java | 16 ++- 4 files changed, 91 insertions(+), 80 deletions(-) diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index d57c0181..00e2f788 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -30,6 +30,8 @@ jobs: include: - flink: 1.18.1 jdk: '8, 11, 17' + - flink: 1.19-SNAPSHOT +jdk: '8, 11, 17, 21' uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: flink_version: ${{ matrix.flink }} diff --git a/flink-connector-kafka/pom.xml b/flink-connector-kafka/pom.xml index 40d6a9f3..6510b9c8 100644 --- a/flink-connector-kafka/pom.xml +++ b/flink-connector-kafka/pom.xml @@ -144,6 +144,10 @@ under the License. org.slf4j slf4j-api + +io.dropwizard.metrics +metrics-core + test diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java index 41c26633..c9eceb98 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java @@ -27,9 +27,11 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.OperatorIOMetricGroup; +import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.metrics.testutils.MetricListener; import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; +import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.util.TestLoggerExtension; import org.apache.flink.util.UserCodeClassLoader; @@ -58,7 +60,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Optional; @@ -84,7 +85,7 @@ public class KafkaWriterITCase { private static final Network NETWORK = Network.newNetwork(); private static final String KAFKA_METRIC_WITH_GROUP_NAME = "KafkaProducer.incoming-byte-total"; private static final SinkWriter.Context SINK_WRITER_CONTEXT = new DummySinkWriterContext(); -private String topic; +private static String topic; private MetricListener metricListener; private TriggerTimeService timeService; @@ -130,11 +131,8 @@ public class KafkaWriterITCase { @Test public void testIncreasingRecordBasedCounters() throws Exception { -final OperatorIOMetricGroup operatorIOMetricGroup = - UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup(); -final InternalSinkWriterMetricGroup metricGroup = -InternalSinkWriterMetricGroup.mock( -metricListener.getMetricGroup(), operatorIOMetricGroup); +final SinkWriterMetricGroup metricGroup = createSinkWriterMetricGroup(); + try (final KafkaWriter writer = createWriterWithConfiguration( getKafkaClientConfiguration(), DeliveryGuarantee.NONE, metricGroup)) { @@ -167,13 +165,9 @@ public class KafkaWriterITCase { @Test public void testCurrentSendTimeMetric() throws Exception { -final InternalSinkWriterMetricGroup metricGroup = - InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()); try (final KafkaWriter writer = createWriterWithConfiguration( -getKafkaClientConfiguration(), -DeliveryGuarantee.AT_LEAST_ONCE, -metricGroup)) { +getKafkaClientConfiguration(), DeliveryGuarantee.AT_LEAST_ONCE)) { final Optional>
(flink-connector-kafka) 01/02: [FLINK-34192] Update to be compatible with updated SinkV2 interfaces
This is an automated email from the ASF dual-hosted git repository. martijnvisser pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-kafka.git commit b8328ab55e2bcf026ef82e35cebbb1d867cfb18f Author: Jiabao Sun AuthorDate: Thu Feb 8 23:16:44 2024 +0800 [FLINK-34192] Update to be compatible with updated SinkV2 interfaces --- .github/workflows/push_pr.yml | 2 + flink-connector-kafka/pom.xml | 4 + .../connector/kafka/sink/KafkaWriterITCase.java| 149 ++--- .../kafka/table/KafkaTableTestUtils.java | 16 ++- 4 files changed, 91 insertions(+), 80 deletions(-) diff --git a/.github/workflows/push_pr.yml b/.github/workflows/push_pr.yml index d57c0181..00e2f788 100644 --- a/.github/workflows/push_pr.yml +++ b/.github/workflows/push_pr.yml @@ -30,6 +30,8 @@ jobs: include: - flink: 1.18.1 jdk: '8, 11, 17' + - flink: 1.19-SNAPSHOT +jdk: '8, 11, 17, 21' uses: apache/flink-connector-shared-utils/.github/workflows/ci.yml@ci_utils with: flink_version: ${{ matrix.flink }} diff --git a/flink-connector-kafka/pom.xml b/flink-connector-kafka/pom.xml index 40d6a9f3..6510b9c8 100644 --- a/flink-connector-kafka/pom.xml +++ b/flink-connector-kafka/pom.xml @@ -144,6 +144,10 @@ under the License. org.slf4j slf4j-api + +io.dropwizard.metrics +metrics-core + test diff --git a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java index 41c26633..c9eceb98 100644 --- a/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java +++ b/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java @@ -27,9 +27,11 @@ import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.metrics.groups.OperatorIOMetricGroup; +import org.apache.flink.metrics.groups.OperatorMetricGroup; import org.apache.flink.metrics.groups.SinkWriterMetricGroup; import org.apache.flink.metrics.testutils.MetricListener; import org.apache.flink.runtime.metrics.groups.InternalSinkWriterMetricGroup; +import org.apache.flink.runtime.metrics.groups.ProxyMetricGroup; import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; import org.apache.flink.util.TestLoggerExtension; import org.apache.flink.util.UserCodeClassLoader; @@ -58,7 +60,6 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; import java.util.Comparator; import java.util.List; import java.util.Optional; @@ -84,7 +85,7 @@ public class KafkaWriterITCase { private static final Network NETWORK = Network.newNetwork(); private static final String KAFKA_METRIC_WITH_GROUP_NAME = "KafkaProducer.incoming-byte-total"; private static final SinkWriter.Context SINK_WRITER_CONTEXT = new DummySinkWriterContext(); -private String topic; +private static String topic; private MetricListener metricListener; private TriggerTimeService timeService; @@ -130,11 +131,8 @@ public class KafkaWriterITCase { @Test public void testIncreasingRecordBasedCounters() throws Exception { -final OperatorIOMetricGroup operatorIOMetricGroup = - UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup().getIOMetricGroup(); -final InternalSinkWriterMetricGroup metricGroup = -InternalSinkWriterMetricGroup.mock( -metricListener.getMetricGroup(), operatorIOMetricGroup); +final SinkWriterMetricGroup metricGroup = createSinkWriterMetricGroup(); + try (final KafkaWriter writer = createWriterWithConfiguration( getKafkaClientConfiguration(), DeliveryGuarantee.NONE, metricGroup)) { @@ -167,13 +165,9 @@ public class KafkaWriterITCase { @Test public void testCurrentSendTimeMetric() throws Exception { -final InternalSinkWriterMetricGroup metricGroup = - InternalSinkWriterMetricGroup.mock(metricListener.getMetricGroup()); try (final KafkaWriter writer = createWriterWithConfiguration( -getKafkaClientConfiguration(), -DeliveryGuarantee.AT_LEAST_ONCE, -metricGroup)) { +getKafkaClientConfiguration(), DeliveryGuarantee.AT_LEAST_ONCE)) { final Optional> currentSendTime =