This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/flink-connector-gcp-pubsub.git


The following commit(s) were added to refs/heads/main by this push:
     new ccc4b87  [FLINK-35548] Add E2E tests for PubSubSinkV2
ccc4b87 is described below

commit ccc4b87ec8e36f498aaf002cc7a399ffbe762840
Author: Ahmed Hamdy <ahmed.ha...@ververica.com>
AuthorDate: Tue Jun 25 14:54:50 2024 +0300

    [FLINK-35548] Add E2E tests for PubSubSinkV2
---
 flink-connector-gcp-pubsub-e2e-tests/pom.xml       |  12 ++
 .../gcp/pubsub/sink/PubSubSinkV2ITTests.java       | 166 +++++++++++++++++++++
 .../gcp/pubsub/sink/util}/PubsubHelper.java        |  65 +++++++-
 .../gcp/pubsub/sink/util/TestChannelProvider.java  |  26 ++++
 .../gcp/pubsub/CheckPubSubEmulatorTest.java        |   2 +-
 .../gcp/pubsub/EmulatedFullTopologyTest.java       |   2 +-
 .../gcp/pubsub/EmulatedPubSubSinkTest.java         |   2 +-
 .../gcp/pubsub/EmulatedPubSubSourceTest.java       |   2 +-
 .../gcp/pubsub/emulator/GCloudUnitTestBase.java    |   1 +
 .../gcp/pubsub/sink/config/GcpPublisherConfig.java |   1 +
 .../SerializableTransportChannelProvider.java      |   1 +
 .../emulator/EmulatorCredentialsProvider.java      |   6 +-
 12 files changed, 274 insertions(+), 12 deletions(-)

diff --git a/flink-connector-gcp-pubsub-e2e-tests/pom.xml 
b/flink-connector-gcp-pubsub-e2e-tests/pom.xml
index 6b69090..41b11f0 100644
--- a/flink-connector-gcp-pubsub-e2e-tests/pom.xml
+++ b/flink-connector-gcp-pubsub-e2e-tests/pom.xml
@@ -57,6 +57,12 @@ under the License.
                        <version>${project.version}</version>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-connector-base</artifactId>
+                       <version>${flink.version}</version>
+                       <scope>test</scope>
+               </dependency>
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
@@ -73,6 +79,12 @@ under the License.
                        <type>test-jar</type>
                        <scope>test</scope>
                </dependency>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       <artifactId>flink-test-utils</artifactId>
+                       <version>${flink.version}</version>
+                       <scope>test</scope>
+               </dependency>
 
                <dependency>
                        <groupId>org.apache.flink</groupId>
diff --git 
a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java
 
b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java
new file mode 100644
index 0000000..41515dc
--- /dev/null
+++ 
b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java
@@ -0,0 +1,166 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import 
org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction;
+import org.apache.flink.connector.datagen.source.DataGeneratorSource;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
+import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider;
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions;
+import org.apache.flink.test.junit5.MiniClusterExtension;
+import org.apache.flink.util.ExceptionUtils;
+
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.pubsub.v1.ReceivedMessage;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.api.parallel.Execution;
+import org.junit.jupiter.api.parallel.ExecutionMode;
+import org.testcontainers.containers.PubSubEmulatorContainer;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+import org.testcontainers.utility.DockerImageName;
+import org.threeten.bp.Duration;
+
+import java.io.IOException;
+import java.net.UnknownHostException;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Integration tests for {@link PubSubSinkV2} using {@link 
PubSubEmulatorContainer}. */
+@ExtendWith(MiniClusterExtension.class)
+@Execution(ExecutionMode.CONCURRENT)
+@Testcontainers
+class PubSubSinkV2ITTests {
+
+    private static final String PROJECT_ID = "test-project";
+
+    private static final String TOPIC_ID = "test-topic";
+
+    private static final String SUBSCRIPTION_ID = "test-subscription";
+
+    private StreamExecutionEnvironment env;
+
+    @Container
+    private static final PubSubEmulatorContainer PUB_SUB_EMULATOR_CONTAINER =
+            new PubSubEmulatorContainer(
+                    
DockerImageName.parse(DockerImageVersions.GOOGLE_CLOUD_PUBSUB_EMULATOR));
+
+    private PubsubHelper pubSubHelper;
+
+    @BeforeEach
+    void setUp() throws IOException {
+        env = StreamExecutionEnvironment.getExecutionEnvironment();
+        pubSubHelper = new 
PubsubHelper(PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint());
+
+        pubSubHelper.createTopic(PROJECT_ID, TOPIC_ID);
+        pubSubHelper.createSubscription(PROJECT_ID, SUBSCRIPTION_ID, 
PROJECT_ID, TOPIC_ID);
+    }
+
+    @AfterEach
+    void tearDown() throws IOException {
+        pubSubHelper.deleteSubscription(PROJECT_ID, SUBSCRIPTION_ID);
+        pubSubHelper.deleteTopic(PROJECT_ID, TOPIC_ID);
+        pubSubHelper.close();
+    }
+
+    @Test
+    void pubSubSinkV2DeliversRecords() throws Exception {
+        String[] elements = new String[] {"test1", "test2", "test3"};
+        DataStreamSource<String> stream =
+                env.fromSource(
+                        new DataGeneratorSource<>(
+                                new FromElementsGeneratorFunction<>(
+                                        BasicTypeInfo.STRING_TYPE_INFO, 
elements),
+                                elements.length,
+                                TypeInformation.of(String.class)),
+                        WatermarkStrategy.noWatermarks(),
+                        "DataGeneratorSource");
+
+        GcpPublisherConfig gcpPublisherConfig =
+                GcpPublisherConfig.builder()
+                        
.setCredentialsProvider(EmulatorCredentialsProvider.create())
+                        .setTransportChannelProvider(
+                                new TestChannelProvider(
+                                        
PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint()))
+                        .build();
+
+        PubSubSinkV2<String> sink =
+                PubSubSinkV2.<String>builder()
+                        .setProjectId(PROJECT_ID)
+                        .setTopicId(TOPIC_ID)
+                        .setSerializationSchema(new SimpleStringSchema())
+                        .setGcpPublisherConfig(gcpPublisherConfig)
+                        .setFailOnError(true)
+                        .build();
+        stream.sinkTo(sink);
+        int maxNumberOfMessages = elements.length;
+        env.execute("PubSubSinkV2ITTests");
+        List<ReceivedMessage> receivedMessages =
+                pubSubHelper.pullMessages(PROJECT_ID, SUBSCRIPTION_ID, 
maxNumberOfMessages);
+
+        assertThat(receivedMessages).hasSameSizeAs(elements);
+        assertThat(receivedMessages)
+                .extracting(ReceivedMessage::getMessage)
+                .extracting(message -> message.getData().toStringUtf8())
+                .containsExactlyInAnyOrder(elements);
+    }
+
+    @Test
+    void pubSubSinkV2PropagatesException() throws Exception {
+        String[] elements = new String[] {"test1", "test2", "test3"};
+        DataStreamSource<String> stream =
+                env.fromSource(
+                        new DataGeneratorSource<>(
+                                new FromElementsGeneratorFunction<>(
+                                        BasicTypeInfo.STRING_TYPE_INFO, 
elements),
+                                elements.length,
+                                TypeInformation.of(String.class)),
+                        WatermarkStrategy.noWatermarks(),
+                        "DataGeneratorSource");
+
+        GcpPublisherConfig gcpPublisherConfig =
+                GcpPublisherConfig.builder()
+                        
.setCredentialsProvider(EmulatorCredentialsProvider.create())
+                        .setTransportChannelProvider(new 
TestChannelProvider("bad-endpoint:1234"))
+                        .setRetrySettings(
+                                RetrySettings.newBuilder()
+                                        .setMaxAttempts(1)
+                                        
.setTotalTimeout(Duration.ofSeconds(10))
+                                        
.setInitialRetryDelay(Duration.ofMillis(100))
+                                        .setRetryDelayMultiplier(1.3)
+                                        
.setMaxRetryDelay(Duration.ofSeconds(5))
+                                        
.setInitialRpcTimeout(Duration.ofSeconds(5))
+                                        .setRpcTimeoutMultiplier(1)
+                                        
.setMaxRpcTimeout(Duration.ofSeconds(10))
+                                        .build())
+                        .build();
+
+        PubSubSinkV2<String> sink =
+                PubSubSinkV2.<String>builder()
+                        .setProjectId(PROJECT_ID)
+                        .setTopicId(TOPIC_ID)
+                        .setSerializationSchema(new SimpleStringSchema())
+                        .setGcpPublisherConfig(gcpPublisherConfig)
+                        .setFailOnError(true)
+                        .build();
+        stream.sinkTo(sink);
+        Assertions.assertThatExceptionOfType(JobExecutionException.class)
+                .isThrownBy(() -> env.execute("PubSubSinkV2ITTests"))
+                .satisfies(
+                        e ->
+                                ExceptionUtils.findThrowable(e, 
UnknownHostException.class)
+                                        .isPresent());
+    }
+}
diff --git 
a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/PubsubHelper.java
 
b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/util/PubsubHelper.java
similarity index 82%
rename from 
flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/PubsubHelper.java
rename to 
flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/util/PubsubHelper.java
index 99ce120..4b6ce1e 100644
--- 
a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/PubsubHelper.java
+++ 
b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/util/PubsubHelper.java
@@ -15,8 +15,12 @@
  * limitations under the License.
  */
 
-package org.apache.flink.streaming.connectors.gcp.pubsub.emulator;
+package org.apache.flink.connector.gcp.pubsub.sink.util;
 
+import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider;
+
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
 import com.google.api.gax.rpc.NotFoundException;
 import com.google.api.gax.rpc.TransportChannelProvider;
 import com.google.cloud.pubsub.v1.MessageReceiver;
@@ -36,11 +40,15 @@ import com.google.pubsub.v1.PushConfig;
 import com.google.pubsub.v1.ReceivedMessage;
 import com.google.pubsub.v1.Topic;
 import com.google.pubsub.v1.TopicName;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.time.Duration;
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
 
 /** A helper class to make managing the testing topics a bit easier. */
@@ -48,15 +56,34 @@ public class PubsubHelper {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(PubsubHelper.class);
 
+    private static final Duration SHUTDOWN_TIMEOUT = Duration.ofSeconds(5);
+
+    private ManagedChannel channel;
+
     private TransportChannelProvider channelProvider;
 
     private TopicAdminClient topicClient;
+
     private SubscriptionAdminClient subscriptionAdminClient;
 
+    public PubsubHelper(String endpoint) {
+        channel = 
ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build();
+        channelProvider =
+                
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
+    }
+
     public PubsubHelper(TransportChannelProvider channelProvider) {
         this.channelProvider = channelProvider;
     }
 
+    public TransportChannelProvider getChannelProvider() {
+        return channelProvider;
+    }
+
+    public ManagedChannel getChannel() {
+        return channel;
+    }
+
     public TopicAdminClient getTopicAdminClient() throws IOException {
         if (topicClient == null) {
             TopicAdminSettings topicAdminSettings =
@@ -90,12 +117,6 @@ public class PubsubHelper {
             return;
         }
 
-        // If it exists we delete all subscriptions and the topic itself.
-        LOG.info("DeleteTopic {} first delete old subscriptions.", topicName);
-        adminClient
-                .listTopicSubscriptions(topicName)
-                .iterateAll()
-                .forEach(subscriptionAdminClient::deleteSubscription);
         LOG.info("DeleteTopic {}", topicName);
         adminClient.deleteTopic(topicName);
     }
@@ -222,4 +243,34 @@ public class PubsubHelper {
                 .setCredentialsProvider(EmulatorCredentialsProvider.create())
                 .build();
     }
+
+    public void close() {
+        if (topicClient != null) {
+            try {
+                topicClient.shutdown();
+                topicClient.awaitTermination(SHUTDOWN_TIMEOUT.getSeconds(), 
TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                LOG.warn("Error shutting down topic client", e);
+            }
+        }
+
+        if (subscriptionAdminClient != null) {
+            try {
+                subscriptionAdminClient.shutdown();
+                subscriptionAdminClient.awaitTermination(
+                        SHUTDOWN_TIMEOUT.getSeconds(), TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                LOG.warn("Error shutting down subscription admin client", e);
+            }
+        }
+
+        if (channel != null) {
+            try {
+                channel.shutdown();
+                channel.awaitTermination(SHUTDOWN_TIMEOUT.getSeconds(), 
TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                LOG.warn("Error shutting down channel", e);
+            }
+        }
+    }
 }
diff --git 
a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/util/TestChannelProvider.java
 
b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/util/TestChannelProvider.java
new file mode 100644
index 0000000..b33f601
--- /dev/null
+++ 
b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/util/TestChannelProvider.java
@@ -0,0 +1,26 @@
+package org.apache.flink.connector.gcp.pubsub.sink.util;
+
+import 
org.apache.flink.connector.gcp.pubsub.sink.config.SerializableTransportChannelProvider;
+
+import com.google.api.gax.grpc.GrpcTransportChannel;
+import com.google.api.gax.rpc.FixedTransportChannelProvider;
+import io.grpc.ManagedChannel;
+import io.grpc.ManagedChannelBuilder;
+
+/** A test channel provider for {@link GrpcTransportChannel}. */
+public class TestChannelProvider extends SerializableTransportChannelProvider {
+
+    private final String endpoint;
+
+    public TestChannelProvider(String endpoint) {
+        this.endpoint = endpoint;
+    }
+
+    @Override
+    protected void open() {
+        ManagedChannel managedChannel =
+                
ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build();
+        this.transportChannelProvider =
+                
FixedTransportChannelProvider.create(GrpcTransportChannel.create(managedChannel));
+    }
+}
diff --git 
a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/CheckPubSubEmulatorTest.java
 
b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/CheckPubSubEmulatorTest.java
index b66bea6..22ea754 100644
--- 
a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/CheckPubSubEmulatorTest.java
+++ 
b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/CheckPubSubEmulatorTest.java
@@ -17,8 +17,8 @@
 
 package org.apache.flink.streaming.connectors.gcp.pubsub;
 
+import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
 import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase;
-import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper;
 
 import com.google.cloud.pubsub.v1.Publisher;
 import com.google.cloud.pubsub.v1.Subscriber;
diff --git 
a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedFullTopologyTest.java
 
b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedFullTopologyTest.java
index a186971..f84320f 100644
--- 
a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedFullTopologyTest.java
+++ 
b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedFullTopologyTest.java
@@ -20,11 +20,11 @@ package org.apache.flink.streaming.connectors.gcp.pubsub;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials;
 import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase;
 import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;
-import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper;
 
 import com.google.cloud.pubsub.v1.Publisher;
 import com.google.protobuf.ByteString;
diff --git 
a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java
 
b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java
index e1aa486..692a195 100644
--- 
a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java
+++ 
b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java
@@ -20,12 +20,12 @@ package org.apache.flink.streaming.connectors.gcp.pubsub;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
 import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials;
 import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase;
-import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper;
 
 import com.google.pubsub.v1.ReceivedMessage;
 import org.apache.commons.lang3.StringUtils;
diff --git 
a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java
 
b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java
index 58597fd..4fbfbdc 100644
--- 
a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java
+++ 
b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java
@@ -18,13 +18,13 @@
 package org.apache.flink.streaming.connectors.gcp.pubsub;
 
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamUtils;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials;
 import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase;
 import 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator;
-import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper;
 
 import com.google.cloud.pubsub.v1.Publisher;
 import com.google.protobuf.ByteString;
diff --git 
a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudUnitTestBase.java
 
b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudUnitTestBase.java
index c26554e..cc0c366 100644
--- 
a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudUnitTestBase.java
+++ 
b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudUnitTestBase.java
@@ -17,6 +17,7 @@
 
 package org.apache.flink.streaming.connectors.gcp.pubsub.emulator;
 
+import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper;
 import 
org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions;
 import org.apache.flink.util.TestLogger;
 
diff --git 
a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/config/GcpPublisherConfig.java
 
b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/config/GcpPublisherConfig.java
index e4bebd2..f1449af 100644
--- 
a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/config/GcpPublisherConfig.java
+++ 
b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/config/GcpPublisherConfig.java
@@ -13,6 +13,7 @@ import java.io.Serializable;
 /** Configuration keys for {@link com.google.cloud.pubsub.v1.Publisher}. */
 @PublicEvolving
 public class GcpPublisherConfig implements Serializable {
+    private static final long serialVersionUID = 1L;
 
     private final RetrySettings retrySettings;
 
diff --git 
a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/config/SerializableTransportChannelProvider.java
 
b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/config/SerializableTransportChannelProvider.java
index 97fc189..4589598 100644
--- 
a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/config/SerializableTransportChannelProvider.java
+++ 
b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/config/SerializableTransportChannelProvider.java
@@ -12,6 +12,7 @@ import java.io.Serializable;
  */
 @PublicEvolving
 public abstract class SerializableTransportChannelProvider implements 
Serializable {
+    private static final long serialVersionUID = 1L;
 
     protected transient TransportChannelProvider transportChannelProvider;
 
diff --git 
a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/EmulatorCredentialsProvider.java
 
b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/EmulatorCredentialsProvider.java
index 5ea454e..42849e4 100644
--- 
a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/EmulatorCredentialsProvider.java
+++ 
b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/EmulatorCredentialsProvider.java
@@ -20,12 +20,16 @@ package 
org.apache.flink.streaming.connectors.gcp.pubsub.emulator;
 import com.google.api.gax.core.CredentialsProvider;
 import com.google.auth.Credentials;
 
+import java.io.Serializable;
+
 /**
  * A CredentialsProvider that simply provides the right credentials that are 
to be used for
  * connecting to an emulator. NOTE: The Google provided NoCredentials and 
NoCredentialsProvider do
  * not behave as expected. See 
https://github.com/googleapis/gax-java/issues/1148
  */
-public final class EmulatorCredentialsProvider implements CredentialsProvider {
+public final class EmulatorCredentialsProvider implements CredentialsProvider, 
Serializable {
+    private static final long serialVersionUID = 1L;
+
     @Override
     public Credentials getCredentials() {
         return EmulatorCredentials.getInstance();

Reply via email to