This is an automated email from the ASF dual-hosted git repository. snuyanzin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/flink-connector-gcp-pubsub.git
The following commit(s) were added to refs/heads/main by this push: new ccc4b87 [FLINK-35548] Add E2E tests for PubSubSinkV2 ccc4b87 is described below commit ccc4b87ec8e36f498aaf002cc7a399ffbe762840 Author: Ahmed Hamdy <ahmed.ha...@ververica.com> AuthorDate: Tue Jun 25 14:54:50 2024 +0300 [FLINK-35548] Add E2E tests for PubSubSinkV2 --- flink-connector-gcp-pubsub-e2e-tests/pom.xml | 12 ++ .../gcp/pubsub/sink/PubSubSinkV2ITTests.java | 166 +++++++++++++++++++++ .../gcp/pubsub/sink/util}/PubsubHelper.java | 65 +++++++- .../gcp/pubsub/sink/util/TestChannelProvider.java | 26 ++++ .../gcp/pubsub/CheckPubSubEmulatorTest.java | 2 +- .../gcp/pubsub/EmulatedFullTopologyTest.java | 2 +- .../gcp/pubsub/EmulatedPubSubSinkTest.java | 2 +- .../gcp/pubsub/EmulatedPubSubSourceTest.java | 2 +- .../gcp/pubsub/emulator/GCloudUnitTestBase.java | 1 + .../gcp/pubsub/sink/config/GcpPublisherConfig.java | 1 + .../SerializableTransportChannelProvider.java | 1 + .../emulator/EmulatorCredentialsProvider.java | 6 +- 12 files changed, 274 insertions(+), 12 deletions(-) diff --git a/flink-connector-gcp-pubsub-e2e-tests/pom.xml b/flink-connector-gcp-pubsub-e2e-tests/pom.xml index 6b69090..41b11f0 100644 --- a/flink-connector-gcp-pubsub-e2e-tests/pom.xml +++ b/flink-connector-gcp-pubsub-e2e-tests/pom.xml @@ -57,6 +57,12 @@ under the License. <version>${project.version}</version> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-base</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.flink</groupId> @@ -73,6 +79,12 @@ under the License. <type>test-jar</type> <scope>test</scope> </dependency> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-test-utils</artifactId> + <version>${flink.version}</version> + <scope>test</scope> + </dependency> <dependency> <groupId>org.apache.flink</groupId> diff --git a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java new file mode 100644 index 0000000..41515dc --- /dev/null +++ b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2ITTests.java @@ -0,0 +1,166 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.api.common.eventtime.WatermarkStrategy; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.common.typeinfo.BasicTypeInfo; +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.connector.datagen.functions.FromElementsGeneratorFunction; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; +import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper; +import org.apache.flink.connector.gcp.pubsub.sink.util.TestChannelProvider; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider; +import org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions; +import org.apache.flink.test.junit5.MiniClusterExtension; +import org.apache.flink.util.ExceptionUtils; + +import com.google.api.gax.retrying.RetrySettings; +import com.google.pubsub.v1.ReceivedMessage; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.testcontainers.containers.PubSubEmulatorContainer; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; +import org.testcontainers.utility.DockerImageName; +import org.threeten.bp.Duration; + +import java.io.IOException; +import java.net.UnknownHostException; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Integration tests for {@link PubSubSinkV2} using {@link PubSubEmulatorContainer}. */ +@ExtendWith(MiniClusterExtension.class) +@Execution(ExecutionMode.CONCURRENT) +@Testcontainers +class PubSubSinkV2ITTests { + + private static final String PROJECT_ID = "test-project"; + + private static final String TOPIC_ID = "test-topic"; + + private static final String SUBSCRIPTION_ID = "test-subscription"; + + private StreamExecutionEnvironment env; + + @Container + private static final PubSubEmulatorContainer PUB_SUB_EMULATOR_CONTAINER = + new PubSubEmulatorContainer( + DockerImageName.parse(DockerImageVersions.GOOGLE_CLOUD_PUBSUB_EMULATOR)); + + private PubsubHelper pubSubHelper; + + @BeforeEach + void setUp() throws IOException { + env = StreamExecutionEnvironment.getExecutionEnvironment(); + pubSubHelper = new PubsubHelper(PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint()); + + pubSubHelper.createTopic(PROJECT_ID, TOPIC_ID); + pubSubHelper.createSubscription(PROJECT_ID, SUBSCRIPTION_ID, PROJECT_ID, TOPIC_ID); + } + + @AfterEach + void tearDown() throws IOException { + pubSubHelper.deleteSubscription(PROJECT_ID, SUBSCRIPTION_ID); + pubSubHelper.deleteTopic(PROJECT_ID, TOPIC_ID); + pubSubHelper.close(); + } + + @Test + void pubSubSinkV2DeliversRecords() throws Exception { + String[] elements = new String[] {"test1", "test2", "test3"}; + DataStreamSource<String> stream = + env.fromSource( + new DataGeneratorSource<>( + new FromElementsGeneratorFunction<>( + BasicTypeInfo.STRING_TYPE_INFO, elements), + elements.length, + TypeInformation.of(String.class)), + WatermarkStrategy.noWatermarks(), + "DataGeneratorSource"); + + GcpPublisherConfig gcpPublisherConfig = + GcpPublisherConfig.builder() + .setCredentialsProvider(EmulatorCredentialsProvider.create()) + .setTransportChannelProvider( + new TestChannelProvider( + PUB_SUB_EMULATOR_CONTAINER.getEmulatorEndpoint())) + .build(); + + PubSubSinkV2<String> sink = + PubSubSinkV2.<String>builder() + .setProjectId(PROJECT_ID) + .setTopicId(TOPIC_ID) + .setSerializationSchema(new SimpleStringSchema()) + .setGcpPublisherConfig(gcpPublisherConfig) + .setFailOnError(true) + .build(); + stream.sinkTo(sink); + int maxNumberOfMessages = elements.length; + env.execute("PubSubSinkV2ITTests"); + List<ReceivedMessage> receivedMessages = + pubSubHelper.pullMessages(PROJECT_ID, SUBSCRIPTION_ID, maxNumberOfMessages); + + assertThat(receivedMessages).hasSameSizeAs(elements); + assertThat(receivedMessages) + .extracting(ReceivedMessage::getMessage) + .extracting(message -> message.getData().toStringUtf8()) + .containsExactlyInAnyOrder(elements); + } + + @Test + void pubSubSinkV2PropagatesException() throws Exception { + String[] elements = new String[] {"test1", "test2", "test3"}; + DataStreamSource<String> stream = + env.fromSource( + new DataGeneratorSource<>( + new FromElementsGeneratorFunction<>( + BasicTypeInfo.STRING_TYPE_INFO, elements), + elements.length, + TypeInformation.of(String.class)), + WatermarkStrategy.noWatermarks(), + "DataGeneratorSource"); + + GcpPublisherConfig gcpPublisherConfig = + GcpPublisherConfig.builder() + .setCredentialsProvider(EmulatorCredentialsProvider.create()) + .setTransportChannelProvider(new TestChannelProvider("bad-endpoint:1234")) + .setRetrySettings( + RetrySettings.newBuilder() + .setMaxAttempts(1) + .setTotalTimeout(Duration.ofSeconds(10)) + .setInitialRetryDelay(Duration.ofMillis(100)) + .setRetryDelayMultiplier(1.3) + .setMaxRetryDelay(Duration.ofSeconds(5)) + .setInitialRpcTimeout(Duration.ofSeconds(5)) + .setRpcTimeoutMultiplier(1) + .setMaxRpcTimeout(Duration.ofSeconds(10)) + .build()) + .build(); + + PubSubSinkV2<String> sink = + PubSubSinkV2.<String>builder() + .setProjectId(PROJECT_ID) + .setTopicId(TOPIC_ID) + .setSerializationSchema(new SimpleStringSchema()) + .setGcpPublisherConfig(gcpPublisherConfig) + .setFailOnError(true) + .build(); + stream.sinkTo(sink); + Assertions.assertThatExceptionOfType(JobExecutionException.class) + .isThrownBy(() -> env.execute("PubSubSinkV2ITTests")) + .satisfies( + e -> + ExceptionUtils.findThrowable(e, UnknownHostException.class) + .isPresent()); + } +} diff --git a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/PubsubHelper.java b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/util/PubsubHelper.java similarity index 82% rename from flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/PubsubHelper.java rename to flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/util/PubsubHelper.java index 99ce120..4b6ce1e 100644 --- a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/PubsubHelper.java +++ b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/util/PubsubHelper.java @@ -15,8 +15,12 @@ * limitations under the License. */ -package org.apache.flink.streaming.connectors.gcp.pubsub.emulator; +package org.apache.flink.connector.gcp.pubsub.sink.util; +import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentialsProvider; + +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; import com.google.api.gax.rpc.NotFoundException; import com.google.api.gax.rpc.TransportChannelProvider; import com.google.cloud.pubsub.v1.MessageReceiver; @@ -36,11 +40,15 @@ import com.google.pubsub.v1.PushConfig; import com.google.pubsub.v1.ReceivedMessage; import com.google.pubsub.v1.Topic; import com.google.pubsub.v1.TopicName; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.Duration; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; /** A helper class to make managing the testing topics a bit easier. */ @@ -48,15 +56,34 @@ public class PubsubHelper { private static final Logger LOG = LoggerFactory.getLogger(PubsubHelper.class); + private static final Duration SHUTDOWN_TIMEOUT = Duration.ofSeconds(5); + + private ManagedChannel channel; + private TransportChannelProvider channelProvider; private TopicAdminClient topicClient; + private SubscriptionAdminClient subscriptionAdminClient; + public PubsubHelper(String endpoint) { + channel = ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build(); + channelProvider = + FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)); + } + public PubsubHelper(TransportChannelProvider channelProvider) { this.channelProvider = channelProvider; } + public TransportChannelProvider getChannelProvider() { + return channelProvider; + } + + public ManagedChannel getChannel() { + return channel; + } + public TopicAdminClient getTopicAdminClient() throws IOException { if (topicClient == null) { TopicAdminSettings topicAdminSettings = @@ -90,12 +117,6 @@ public class PubsubHelper { return; } - // If it exists we delete all subscriptions and the topic itself. - LOG.info("DeleteTopic {} first delete old subscriptions.", topicName); - adminClient - .listTopicSubscriptions(topicName) - .iterateAll() - .forEach(subscriptionAdminClient::deleteSubscription); LOG.info("DeleteTopic {}", topicName); adminClient.deleteTopic(topicName); } @@ -222,4 +243,34 @@ public class PubsubHelper { .setCredentialsProvider(EmulatorCredentialsProvider.create()) .build(); } + + public void close() { + if (topicClient != null) { + try { + topicClient.shutdown(); + topicClient.awaitTermination(SHUTDOWN_TIMEOUT.getSeconds(), TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn("Error shutting down topic client", e); + } + } + + if (subscriptionAdminClient != null) { + try { + subscriptionAdminClient.shutdown(); + subscriptionAdminClient.awaitTermination( + SHUTDOWN_TIMEOUT.getSeconds(), TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn("Error shutting down subscription admin client", e); + } + } + + if (channel != null) { + try { + channel.shutdown(); + channel.awaitTermination(SHUTDOWN_TIMEOUT.getSeconds(), TimeUnit.SECONDS); + } catch (InterruptedException e) { + LOG.warn("Error shutting down channel", e); + } + } + } } diff --git a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/util/TestChannelProvider.java b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/util/TestChannelProvider.java new file mode 100644 index 0000000..b33f601 --- /dev/null +++ b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/util/TestChannelProvider.java @@ -0,0 +1,26 @@ +package org.apache.flink.connector.gcp.pubsub.sink.util; + +import org.apache.flink.connector.gcp.pubsub.sink.config.SerializableTransportChannelProvider; + +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.rpc.FixedTransportChannelProvider; +import io.grpc.ManagedChannel; +import io.grpc.ManagedChannelBuilder; + +/** A test channel provider for {@link GrpcTransportChannel}. */ +public class TestChannelProvider extends SerializableTransportChannelProvider { + + private final String endpoint; + + public TestChannelProvider(String endpoint) { + this.endpoint = endpoint; + } + + @Override + protected void open() { + ManagedChannel managedChannel = + ManagedChannelBuilder.forTarget(endpoint).usePlaintext().build(); + this.transportChannelProvider = + FixedTransportChannelProvider.create(GrpcTransportChannel.create(managedChannel)); + } +} diff --git a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/CheckPubSubEmulatorTest.java b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/CheckPubSubEmulatorTest.java index b66bea6..22ea754 100644 --- a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/CheckPubSubEmulatorTest.java +++ b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/CheckPubSubEmulatorTest.java @@ -17,8 +17,8 @@ package org.apache.flink.streaming.connectors.gcp.pubsub; +import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper; import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase; -import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper; import com.google.cloud.pubsub.v1.Publisher; import com.google.cloud.pubsub.v1.Subscriber; diff --git a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedFullTopologyTest.java b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedFullTopologyTest.java index a186971..f84320f 100644 --- a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedFullTopologyTest.java +++ b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedFullTopologyTest.java @@ -20,11 +20,11 @@ package org.apache.flink.streaming.connectors.gcp.pubsub; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials; import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase; import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator; -import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper; import com.google.cloud.pubsub.v1.Publisher; import com.google.protobuf.ByteString; diff --git a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java index e1aa486..692a195 100644 --- a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java +++ b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSinkTest.java @@ -20,12 +20,12 @@ package org.apache.flink.streaming.connectors.gcp.pubsub; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials; import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase; -import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper; import com.google.pubsub.v1.ReceivedMessage; import org.apache.commons.lang3.StringUtils; diff --git a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java index 58597fd..4fbfbdc 100644 --- a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java +++ b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/EmulatedPubSubSourceTest.java @@ -18,13 +18,13 @@ package org.apache.flink.streaming.connectors.gcp.pubsub; import org.apache.flink.api.common.restartstrategy.RestartStrategies; +import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamUtils; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.EmulatorCredentials; import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.GCloudUnitTestBase; import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubSubSubscriberFactoryForEmulator; -import org.apache.flink.streaming.connectors.gcp.pubsub.emulator.PubsubHelper; import com.google.cloud.pubsub.v1.Publisher; import com.google.protobuf.ByteString; diff --git a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudUnitTestBase.java b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudUnitTestBase.java index c26554e..cc0c366 100644 --- a/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudUnitTestBase.java +++ b/flink-connector-gcp-pubsub-e2e-tests/src/test/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/GCloudUnitTestBase.java @@ -17,6 +17,7 @@ package org.apache.flink.streaming.connectors.gcp.pubsub.emulator; +import org.apache.flink.connector.gcp.pubsub.sink.util.PubsubHelper; import org.apache.flink.streaming.connectors.gcp.pubsub.test.DockerImageVersions; import org.apache.flink.util.TestLogger; diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/config/GcpPublisherConfig.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/config/GcpPublisherConfig.java index e4bebd2..f1449af 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/config/GcpPublisherConfig.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/config/GcpPublisherConfig.java @@ -13,6 +13,7 @@ import java.io.Serializable; /** Configuration keys for {@link com.google.cloud.pubsub.v1.Publisher}. */ @PublicEvolving public class GcpPublisherConfig implements Serializable { + private static final long serialVersionUID = 1L; private final RetrySettings retrySettings; diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/config/SerializableTransportChannelProvider.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/config/SerializableTransportChannelProvider.java index 97fc189..4589598 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/config/SerializableTransportChannelProvider.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/config/SerializableTransportChannelProvider.java @@ -12,6 +12,7 @@ import java.io.Serializable; */ @PublicEvolving public abstract class SerializableTransportChannelProvider implements Serializable { + private static final long serialVersionUID = 1L; protected transient TransportChannelProvider transportChannelProvider; diff --git a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/EmulatorCredentialsProvider.java b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/EmulatorCredentialsProvider.java index 5ea454e..42849e4 100644 --- a/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/EmulatorCredentialsProvider.java +++ b/flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/emulator/EmulatorCredentialsProvider.java @@ -20,12 +20,16 @@ package org.apache.flink.streaming.connectors.gcp.pubsub.emulator; import com.google.api.gax.core.CredentialsProvider; import com.google.auth.Credentials; +import java.io.Serializable; + /** * A CredentialsProvider that simply provides the right credentials that are to be used for * connecting to an emulator. NOTE: The Google provided NoCredentials and NoCredentialsProvider do * not behave as expected. See https://github.com/googleapis/gax-java/issues/1148 */ -public final class EmulatorCredentialsProvider implements CredentialsProvider { +public final class EmulatorCredentialsProvider implements CredentialsProvider, Serializable { + private static final long serialVersionUID = 1L; + @Override public Credentials getCredentials() { return EmulatorCredentials.getInstance();