Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
snuyanzin merged PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#issuecomment-2133038364 @snuyanzin I addressed the introduced dependency issue, and the feedback comments. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1615752158 ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/common/PubSubConstants.java: ## @@ -0,0 +1,11 @@ +package org.apache.flink.connector.gcp.pubsub.common; + +import org.apache.flink.annotation.PublicEvolving; + +/** Constants for PubSub. */ +@PublicEvolving +public class PubSubConstants { Review Comment: It is supposed to be base for extension with other constants, but I agree let's leave that when we actually need it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
snuyanzin commented on PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#issuecomment-2132266341 it seems dependency convergence issue should be resolved -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
snuyanzin commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1615240064 ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/common/PubSubConstants.java: ## @@ -0,0 +1,11 @@ +package org.apache.flink.connector.gcp.pubsub.common; + +import org.apache.flink.annotation.PublicEvolving; + +/** Constants for PubSub. */ +@PublicEvolving +public class PubSubConstants { Review Comment: Do we really need a separate class for these constants if they are used only in `PubSubSinkV2Builder` and `PubSubSinkV2BuilderTest` ? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
morazow commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1603322536 ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java: ## @@ -0,0 +1,195 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.TopicName; + +import java.io.IOException; +import java.util.Optional; + +import static org.apache.flink.util.concurrent.Executors.directExecutor; + +/** + * A stateless {@link SinkWriter} that writes records to Pub/Sub using generic {@link + * GcpWriterClient}. The writer blocks on completion of inflight requests on {@code flush()} and + * {@code close()}. The writer also uses {@code maxInFlightRequests} and blocks new writes if the + * number of inflight requests. + * + * @param The type of the records . + */ +@Internal +public class PubSubWriter implements SinkWriter { + +/** The PubSub generic client to publish messages. */ +private final GcpWriterClient publisher; + +/** + * The maximum number of inflight requests, The writer blocks new writes if the number of + * inflight requests exceeds the specified limit. + */ +private final long maximumInflightRequests; + +/** + * Flag to indicate whether to fail on errors, if unset the writer will retry non-fatal request + * failures. + */ +private final boolean failOnError; + +private long inflightRequests = 0; + +private final MailboxExecutor mailboxExecutor; +private final Counter numBytesOutCounter; +private final Counter numRecordsOutCounter; +private final Counter numRecordsOutErrorCounter; +private final SerializationSchema serializationSchema; + +PubSubWriter( +String projectId, +String topicId, +SerializationSchema serializationSchema, +WriterInitContext context, +GcpPublisherConfig publisherConfig, +long maximumInflightRequests, +boolean failOnError) +throws IOException { +this( +createPublisher(projectId, topicId, publisherConfig), +context, +serializationSchema, +maximumInflightRequests, +failOnError); +} + +@VisibleForTesting +PubSubWriter( +GcpWriterClient publisher, +WriterInitContext context, +SerializationSchema serializationSchema, +long maximumInflightRequests, +boolean failOnError) { +this.publisher = Preconditions.checkNotNull(publisher); +this.serializationSchema = Preconditions.checkNotNull(serializationSchema); +Preconditions.checkNotNull(context, "Context cannot be null."); + +this.mailboxExecutor = context.getMailboxExecutor(); +this.numBytesOutCounter = context.metricGroup().getIOMetricGroup().getNumBytesOutCounter(); +this.numRecordsOutCounter = + context.metricGroup().getIOMetricGroup().getNumRecordsOutCounter(); +this.numRecordsOutErrorCounter = context.metricGroup().getNumRecordsOutErrorsCounter(); +this.maximumInflightRequests = maximumInflightRequests; +this.failOnError = failOnError; +} + +@Override +public void write(T t, SinkWriter.Context context) throws IOException, InterruptedException { +awaitMaxInflightRequestsBelow(maximumInflightRequests); +PubsubMessage message = +PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(serializationSchema.serialize(t))) +.build(); +publishMessage(message); +} + +@Override +public void flush(boolean b) throws IOException, InterruptedException { +publisher.flush(); +awaitMaxInflightRequestsBelow(1); +} + +private void awaitMaxInflightRequestsBelow(long maxInflightRequests) +throws InterruptedException { +while (inflightRequests >= maxInflightRequests) { +mailboxExecutor.yield(); +} +} + +private void
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#issuecomment-2114737215 @snuyanzin thanks for the feedback, all addressed now, could you have another round please? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#issuecomment-2114736463 @morazow @affo Thanks for the feedback, addressed you comments -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1602999204 ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java: ## @@ -0,0 +1,195 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.TopicName; + +import java.io.IOException; +import java.util.Optional; + +import static org.apache.flink.util.concurrent.Executors.directExecutor; + +/** + * A stateless {@link SinkWriter} that writes records to Pub/Sub using generic {@link + * GcpWriterClient}. The writer blocks on completion of inflight requests on {@code flush()} and + * {@code close()}. The writer also uses {@code maxInFlightRequests} and blocks new writes if the + * number of inflight requests. + * + * @param The type of the records . + */ +@Internal +public class PubSubWriter implements SinkWriter { + +/** The PubSub generic client to publish messages. */ +private final GcpWriterClient publisher; + +/** + * The maximum number of inflight requests, The writer blocks new writes if the number of + * inflight requests exceeds the specified limit. + */ +private final long maximumInflightRequests; + +/** + * Flag to indicate whether to fail on errors, if unset the writer will retry non-fatal request + * failures. + */ +private final boolean failOnError; + +private long inflightRequests = 0; + +private final MailboxExecutor mailboxExecutor; +private final Counter numBytesOutCounter; +private final Counter numRecordsOutCounter; +private final Counter numRecordsOutErrorCounter; +private final SerializationSchema serializationSchema; + +PubSubWriter( +String projectId, +String topicId, +SerializationSchema serializationSchema, +WriterInitContext context, +GcpPublisherConfig publisherConfig, +long maximumInflightRequests, +boolean failOnError) +throws IOException { +this( +createPublisher(projectId, topicId, publisherConfig), +context, +serializationSchema, +maximumInflightRequests, +failOnError); +} + +@VisibleForTesting +PubSubWriter( +GcpWriterClient publisher, +WriterInitContext context, +SerializationSchema serializationSchema, +long maximumInflightRequests, +boolean failOnError) { +this.publisher = Preconditions.checkNotNull(publisher); +this.serializationSchema = Preconditions.checkNotNull(serializationSchema); +Preconditions.checkNotNull(context, "Context cannot be null."); + +this.mailboxExecutor = context.getMailboxExecutor(); +this.numBytesOutCounter = context.metricGroup().getIOMetricGroup().getNumBytesOutCounter(); +this.numRecordsOutCounter = + context.metricGroup().getIOMetricGroup().getNumRecordsOutCounter(); +this.numRecordsOutErrorCounter = context.metricGroup().getNumRecordsOutErrorsCounter(); +this.maximumInflightRequests = maximumInflightRequests; +this.failOnError = failOnError; +} + +@Override +public void write(T t, SinkWriter.Context context) throws IOException, InterruptedException { +awaitMaxInflightRequestsBelow(maximumInflightRequests); +PubsubMessage message = +PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(serializationSchema.serialize(t))) +.build(); +publishMessage(message); +} + +@Override +public void flush(boolean b) throws IOException, InterruptedException { +publisher.flush(); +awaitMaxInflightRequestsBelow(1); +} + +private void awaitMaxInflightRequestsBelow(long maxInflightRequests) +throws InterruptedException { +while (inflightRequests >= maxInflightRequests) { +mailboxExecutor.yield(); +} +} + +private void
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1602993681 ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java: ## @@ -0,0 +1,195 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.TopicName; + +import java.io.IOException; +import java.util.Optional; + +import static org.apache.flink.util.concurrent.Executors.directExecutor; + +/** + * A stateless {@link SinkWriter} that writes records to Pub/Sub using generic {@link + * GcpWriterClient}. The writer blocks on completion of inflight requests on {@code flush()} and + * {@code close()}. The writer also uses {@code maxInFlightRequests} and blocks new writes if the + * number of inflight requests exceeds the specified limit. + * + * @param The type of the records . + */ +@Internal +public class PubSubWriter implements SinkWriter { + +/** The PubSub generic client to publish messages. */ +private final GcpWriterClient publisher; + +/** + * The maximum number of inflight requests, The writer blocks new writes if the number of + * inflight requests exceeds the specified limit. + */ +private final long maximumInflightRequests; + +/** + * Flag to indicate whether to fail on errors, if unset the writer will retry non-fatal request + * failures. + */ +private final boolean failOnError; + +private long inflightRequests = 0; + +private final MailboxExecutor mailboxExecutor; +private final Counter numBytesOutCounter; +private final Counter numRecordsOutCounter; +private final Counter numRecordsOutErrorCounter; +private final SerializationSchema serializationSchema; + +PubSubWriter( +String projectId, +String topicId, +SerializationSchema serializationSchema, +WriterInitContext context, +GcpPublisherConfig publisherConfig, +long maximumInflightRequests, +boolean failOnError) +throws IOException { +this( +createPublisher(projectId, topicId, publisherConfig), +context, +serializationSchema, +maximumInflightRequests, +failOnError); +} + +@VisibleForTesting +PubSubWriter( +GcpWriterClient publisher, +WriterInitContext context, +SerializationSchema serializationSchema, +long maximumInflightRequests, +boolean failOnError) { +this.publisher = Preconditions.checkNotNull(publisher); +this.serializationSchema = Preconditions.checkNotNull(serializationSchema); +Preconditions.checkNotNull(context, "Context cannot be null."); + +this.mailboxExecutor = context.getMailboxExecutor(); +this.numBytesOutCounter = context.metricGroup().getIOMetricGroup().getNumBytesOutCounter(); +this.numRecordsOutCounter = + context.metricGroup().getIOMetricGroup().getNumRecordsOutCounter(); +this.numRecordsOutErrorCounter = context.metricGroup().getNumRecordsOutErrorsCounter(); +this.maximumInflightRequests = maximumInflightRequests; +this.failOnError = failOnError; +} + +@Override +public void write(T t, SinkWriter.Context context) throws IOException, InterruptedException { +awaitMaxInflightRequestsBelow(maximumInflightRequests); +PubsubMessage message = +PubsubMessage.newBuilder() + .setData(ByteString.copyFrom(serializationSchema.serialize(t))) +.build(); +publishMessage(message); +} + +@Override +public void flush(boolean b) throws IOException, InterruptedException { +publisher.flush(); +awaitMaxInflightRequestsBelow(1); Review Comment: @affo the inflight requests callback is queued in the mailbox, we yield here for callback executions, shall requests be sent successfully the call back should reset the inflight requests back to 0 exiting the loop --
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1602978021 ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2Builder.java: ## @@ -0,0 +1,93 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; + +import java.util.Optional; + +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR; +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS; + +/** + * A builder for creating a {@link PubSubSinkV2}. + * + * The builder uses the following parameters to build a {@link PubSubSinkV2}: + * + * + * {@link GcpPublisherConfig} for the {@link com.google.cloud.pubsub.v1.Publisher} + * configuration. + * {@link SerializationSchema} for serializing the input data. + * {@code projectId} for the name of the project where the topic is located. + * {@code topicId} for the name of the topic to send messages to. + * {@code maximumInflightMessages} for the maximum number of in-flight messages. + * {@code failOnError} for whether to fail on an error. + * + * + * It can be used as follows: + * + * {@code + * PubSubSinkV2Builder pubSubSink = {@code PubSubSinkV2Builder}.builder() + * .setProjectId("project-id") + * .setTopicId("topic-id) + * .setGcpPublisherConfig(gcpPublisherConfig) + * .setSerializationSchema(new SimpleStringSchema()) + * .setMaximumInflightMessages(10) + * .setFailOnError(true) + * .build(); + * + * } + * + * @param + */ +@PublicEvolving +public class PubSubSinkV2Builder { +private String projectId; +private String topicId; +private SerializationSchema serializationSchema; +private GcpPublisherConfig gcpPublisherConfig; +private Integer numMaxInflightRequests; +private Boolean failOnError; + +public PubSubSinkV2Builder setProjectId(String projectId) { +this.projectId = projectId; +return this; +} + +public PubSubSinkV2Builder setTopicId(String topicId) { +this.topicId = topicId; +return this; +} + +public PubSubSinkV2Builder setSerializationSchema( +SerializationSchema serializationSchema) { +this.serializationSchema = serializationSchema; +return this; +} + +public PubSubSinkV2Builder setGcpPublisherConfig(GcpPublisherConfig gcpPublisherConfig) { +this.gcpPublisherConfig = gcpPublisherConfig; +return this; +} + +public PubSubSinkV2Builder setNumMaxInflightRequests(int numMaxInflightRequests) { +this.numMaxInflightRequests = numMaxInflightRequests; Review Comment: This is validated in sink constructor ``` Preconditions.checkArgument( maxInFlightRequests > 0, "Max in-flight requests must be greater than 0."); ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1602973242 ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java: ## @@ -0,0 +1,195 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.TopicName; + +import java.io.IOException; +import java.util.Optional; + +import static org.apache.flink.util.concurrent.Executors.directExecutor; + +/** + * A stateless {@link SinkWriter} that writes records to Pub/Sub using generic {@link + * GcpWriterClient}. The writer blocks on completion of inflight requests on {@code flush()} and + * {@code close()}. The writer also uses {@code maxInFlightRequests} and blocks new writes if the + * number of inflight requests exceeds the specified limit. + * + * @param The type of the records . + */ +@Internal +public class PubSubWriter implements SinkWriter { + +/** The PubSub generic client to publish messages. */ +private final GcpWriterClient publisher; + +/** + * The maximum number of inflight requests, The writer blocks new writes if the number of + * inflight requests exceeds the specified limit. + */ +private final long maximumInflightRequests; Review Comment: Agreed, will use `int` for both -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1602970403 ## flink-connector-gcp-pubsub/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2BuilderTest.java: ## @@ -0,0 +1,133 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; + +import com.google.api.gax.core.NoCredentialsProvider; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR; +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS; + +/** Tests for {@link PubSubSinkV2Builder}. */ +public class PubSubSinkV2BuilderTest { + +@Test +void builderBuildsSinkWithCorrectProperties() { +PubSubSinkV2Builder builder = PubSubSinkV2.builder(); +GcpPublisherConfig gcpPublisherConfig = +GcpPublisherConfig.builder() +.setCredentialsProvider(NoCredentialsProvider.create()) +.build(); + +SerializationSchema serializationSchema = new SimpleStringSchema(); + +builder.setProjectId("test-project-id") +.setTopicId("test-topic-id") +.setGcpPublisherConfig(gcpPublisherConfig) +.setSerializationSchema(serializationSchema) +.setNumMaxInflightRequests(10) +.setFailOnError(true); +PubSubSinkV2 sink = builder.build(); + +Assertions.assertThat(sink).hasFieldOrPropertyWithValue("projectId", "test-project-id"); +Assertions.assertThat(sink).hasFieldOrPropertyWithValue("topicId", "test-topic-id"); +Assertions.assertThat(sink) +.hasFieldOrPropertyWithValue("serializationSchema", serializationSchema); +Assertions.assertThat(sink) +.hasFieldOrPropertyWithValue("publisherConfig", gcpPublisherConfig); + Assertions.assertThat(sink).hasFieldOrPropertyWithValue("maxInFlightRequests", 10); +Assertions.assertThat(sink).hasFieldOrPropertyWithValue("failOnError", true); Review Comment: yeah, that would make more sense here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1602966998 ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2Builder.java: ## @@ -0,0 +1,93 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; + +import java.util.Optional; + +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR; +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS; + +/** + * A builder for creating a {@link PubSubSinkV2}. + * + * The builder uses the following parameters to build a {@link PubSubSinkV2}: + * + * + * {@link GcpPublisherConfig} for the {@link com.google.cloud.pubsub.v1.Publisher} + * configuration. + * {@link SerializationSchema} for serializing the input data. + * {@code projectId} for the name of the project where the topic is located. + * {@code topicId} for the name of the topic to send messages to. + * {@code maximumInflightMessages} for the maximum number of in-flight messages. + * {@code failOnError} for whether to fail on an error. + * + * + * It can be used as follows: + * + * {@code + * PubSubSinkV2Builder pubSubSink = {@code PubSubSinkV2Builder}.builder() + * .setProjectId("project-id") + * .setTopicId("topic-id) + * .setGcpPublisherConfig(gcpPublisherConfig) + * .setSerializationSchema(new SimpleStringSchema()) + * .setMaximumInflightMessages(10) + * .setFailOnError(true) + * .build(); + * + * } + * + * @param + */ +@PublicEvolving +public class PubSubSinkV2Builder { +private String projectId; +private String topicId; +private SerializationSchema serializationSchema; +private GcpPublisherConfig gcpPublisherConfig; +private Integer numMaxInflightRequests; +private Boolean failOnError; Review Comment: They are nullable to use defaults if not set with builder, they are never nullable when passed to the sink though ``` Optional.ofNullable(numMaxInflightRequests) .orElse(DEFAULT_MAXIMUM_INFLIGHT_REQUESTS), Optional.ofNullable(failOnError).orElse(DEFAULT_FAIL_ON_ERROR)) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1602966998 ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2Builder.java: ## @@ -0,0 +1,93 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; + +import java.util.Optional; + +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR; +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS; + +/** + * A builder for creating a {@link PubSubSinkV2}. + * + * The builder uses the following parameters to build a {@link PubSubSinkV2}: + * + * + * {@link GcpPublisherConfig} for the {@link com.google.cloud.pubsub.v1.Publisher} + * configuration. + * {@link SerializationSchema} for serializing the input data. + * {@code projectId} for the name of the project where the topic is located. + * {@code topicId} for the name of the topic to send messages to. + * {@code maximumInflightMessages} for the maximum number of in-flight messages. + * {@code failOnError} for whether to fail on an error. + * + * + * It can be used as follows: + * + * {@code + * PubSubSinkV2Builder pubSubSink = {@code PubSubSinkV2Builder}.builder() + * .setProjectId("project-id") + * .setTopicId("topic-id) + * .setGcpPublisherConfig(gcpPublisherConfig) + * .setSerializationSchema(new SimpleStringSchema()) + * .setMaximumInflightMessages(10) + * .setFailOnError(true) + * .build(); + * + * } + * + * @param + */ +@PublicEvolving +public class PubSubSinkV2Builder { +private String projectId; +private String topicId; +private SerializationSchema serializationSchema; +private GcpPublisherConfig gcpPublisherConfig; +private Integer numMaxInflightRequests; +private Boolean failOnError; Review Comment: They are nullable to use defaults when needed ``` Optional.ofNullable(numMaxInflightRequests) .orElse(DEFAULT_MAXIMUM_INFLIGHT_REQUESTS), Optional.ofNullable(failOnError).orElse(DEFAULT_FAIL_ON_ERROR)) ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1602964623 ## .github/workflows/weekly.yml: ## @@ -37,10 +37,6 @@ jobs: flink: 1.19-SNAPSHOT, jdk: '8, 11, 17, 21', branch: main -}, { - flink: 1.18-SNAPSHOT, - jdk: '8, 11, 17', - branch: main Review Comment: Good catch! Will remove from pr flow -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
affo commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1602735147 ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java: ## @@ -0,0 +1,195 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.TopicName; + +import java.io.IOException; +import java.util.Optional; + +import static org.apache.flink.util.concurrent.Executors.directExecutor; + +/** + * A stateless {@link SinkWriter} that writes records to Pub/Sub using generic {@link Review Comment: Following up on @snuyanzin comments, let's keep the consistency, either `Pub/Sub` or `PubSub` ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java: ## @@ -0,0 +1,195 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.TopicName; + +import java.io.IOException; +import java.util.Optional; + +import static org.apache.flink.util.concurrent.Executors.directExecutor; + +/** + * A stateless {@link SinkWriter} that writes records to Pub/Sub using generic {@link + * GcpWriterClient}. The writer blocks on completion of inflight requests on {@code flush()} and + * {@code close()}. The writer also uses {@code maxInFlightRequests} and blocks new writes if the + * number of inflight requests exceeds the specified limit. + * + * @param The type of the records . + */ +@Internal +public class PubSubWriter implements SinkWriter { + +/** The PubSub generic client to publish messages. */ +private final GcpWriterClient publisher; + +/** + * The maximum number of inflight requests, The writer blocks new writes if the number of + * inflight requests exceeds the specified limit. + */ +private final long maximumInflightRequests; + +/** + * Flag to indicate whether to fail on errors, if unset the writer will retry non-fatal request + * failures. + */ +private final boolean failOnError; + +private long inflightRequests = 0; + +private final MailboxExecutor mailboxExecutor; +private final Counter numBytesOutCounter; +private final Counter numRecordsOutCounter; +private final Counter numRecordsOutErrorCounter; +private final SerializationSchema serializationSchema; + +PubSubWriter( +String projectId, +String topicId, +SerializationSchema serializationSchema, +WriterInitContext context, +GcpPublisherConfig publisherConfig, +long maximumInflightRequests, +boolean failOnError) +throws IOException { +this( +createPublisher(projectId, topicId, publisherConfig), +context, +serializationSchema, +maximumInflightRequests, +failOnError); +} + +@VisibleForTesting +PubSubWriter( +GcpWriterClient publisher, +WriterInitContext context, +SerializationSchema serializationSchema, +long maximumInflightRequests, +boolean failOnError) { +this.publisher = Preconditions.checkNotNull(publisher); +this.serializationSchema =
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
snuyanzin commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601908776 ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2Builder.java: ## @@ -0,0 +1,93 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; + +import java.util.Optional; + +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR; +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS; + +/** + * A builder for creating a {@link PubSubSinkV2}. + * + * The builder uses the following parameters to build a {@link PubSubSinkV2}: + * + * + * {@link GcpPublisherConfig} for the {@link com.google.cloud.pubsub.v1.Publisher} + * configuration. + * {@link SerializationSchema} for serializing the input data. + * {@code projectId} for the name of the project where the topic is located. + * {@code topicId} for the name of the topic to send messages to. + * {@code maximumInflightMessages} for the maximum number of in-flight messages. + * {@code failOnError} for whether to fail on an error. + * + * + * It can be used as follows: + * + * {@code + * PubSubSinkV2Builder pubSubSink = {@code PubSubSinkV2Builder}.builder() + * .setProjectId("project-id") + * .setTopicId("topic-id) + * .setGcpPublisherConfig(gcpPublisherConfig) + * .setSerializationSchema(new SimpleStringSchema()) + * .setMaximumInflightMessages(10) + * .setFailOnError(true) + * .build(); + * + * } + * + * @param + */ +@PublicEvolving +public class PubSubSinkV2Builder { +private String projectId; +private String topicId; +private SerializationSchema serializationSchema; +private GcpPublisherConfig gcpPublisherConfig; +private Integer numMaxInflightRequests; +private Boolean failOnError; + +public PubSubSinkV2Builder setProjectId(String projectId) { +this.projectId = projectId; +return this; +} + +public PubSubSinkV2Builder setTopicId(String topicId) { +this.topicId = topicId; +return this; +} + +public PubSubSinkV2Builder setSerializationSchema( +SerializationSchema serializationSchema) { +this.serializationSchema = serializationSchema; +return this; +} + +public PubSubSinkV2Builder setGcpPublisherConfig(GcpPublisherConfig gcpPublisherConfig) { +this.gcpPublisherConfig = gcpPublisherConfig; +return this; +} + +public PubSubSinkV2Builder setNumMaxInflightRequests(int numMaxInflightRequests) { +this.numMaxInflightRequests = numMaxInflightRequests; Review Comment: what happens if we pass a negative number? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
snuyanzin commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601907967 ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java: ## @@ -0,0 +1,195 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.TopicName; + +import java.io.IOException; +import java.util.Optional; + +import static org.apache.flink.util.concurrent.Executors.directExecutor; + +/** + * A stateless {@link SinkWriter} that writes records to Pub/Sub using generic {@link + * GcpWriterClient}. The writer blocks on completion of inflight requests on {@code flush()} and + * {@code close()}. The writer also uses {@code maxInFlightRequests} and blocks new writes if the + * number of inflight requests exceeds the specified limit. + * + * @param The type of the records . + */ +@Internal +public class PubSubWriter implements SinkWriter { + +/** The PubSub generic client to publish messages. */ +private final GcpWriterClient publisher; + +/** + * The maximum number of inflight requests, The writer blocks new writes if the number of + * inflight requests exceeds the specified limit. + */ +private final long maximumInflightRequests; Review Comment: Can we align types? here it is `long` while in `PubSubSinkV2Builder` it is `int` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
snuyanzin commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601895646 ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2Builder.java: ## @@ -0,0 +1,93 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; + +import java.util.Optional; + +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR; +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS; + +/** + * A builder for creating a {@link PubSubSinkV2}. + * + * The builder uses the following parameters to build a {@link PubSubSinkV2}: + * + * + * {@link GcpPublisherConfig} for the {@link com.google.cloud.pubsub.v1.Publisher} + * configuration. + * {@link SerializationSchema} for serializing the input data. + * {@code projectId} for the name of the project where the topic is located. + * {@code topicId} for the name of the topic to send messages to. + * {@code maximumInflightMessages} for the maximum number of in-flight messages. Review Comment: ```suggestion * {@code maximumInflightMessages} for the maximum number of inflight messages. ``` i would suggest we use the same word everywhere rather then it's different variations -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
snuyanzin commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601877099 ## flink-connector-gcp-pubsub/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriterTest.java: ## @@ -0,0 +1,245 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; +import org.apache.flink.connector.base.sink.writer.TestSinkInitContextAnyThreadMailbox; +import org.apache.flink.connector.gcp.pubsub.sink.util.TestGcpWriterClient; +import org.apache.flink.metrics.Counter; +import org.apache.flink.util.FlinkRuntimeException; + +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.StatusRuntimeException; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +/** Tests for {@link PubSubWriter}. */ +public class PubSubWriterTest { + +private static final int MAXIMUM_INFLIGHT_MESSAGES = 3; + +@Test +void writeMessageDeliversMessageUsingClient() throws IOException, InterruptedException { +TestGcpWriterClient client = new TestGcpWriterClient(); +PubSubWriter writer = +getDefaultWriter(client, new TestSinkInitContext(), MAXIMUM_INFLIGHT_MESSAGES); +String message = "test-message"; +writer.write(message, null); +client.deliverMessage(message); +writer.flush(false); + + Assertions.assertThat(client.getDeliveredMessages()).containsExactly(message); +} + +@Test +void writeMessageIncrementsMetricsOnDelivery() throws IOException, InterruptedException { +TestGcpWriterClient client = new TestGcpWriterClient(); +WriterInitContext context = new TestSinkInitContext(); + +PubSubWriter writer = getDefaultWriter(client, context, MAXIMUM_INFLIGHT_MESSAGES); +String message = "test-message"; + +// write message +writer.write(message, null); + +// get metrics before delivery +Counter numBytesOutCounter = context.metricGroup().getNumBytesSendCounter(); +Counter numRecordsSendCounter = context.metricGroup().getNumRecordsSendCounter(); +long recordsSentBeforeDelivery = numRecordsSendCounter.getCount(); +long bytesSentBeforeDelivery = numBytesOutCounter.getCount(); + +// deliver message +client.deliverMessage(message); +writer.flush(false); + +long messageSize = +PubsubMessage.newBuilder() +.setData(ByteString.copyFromUtf8(message)) +.build() +.getSerializedSize(); + +Assertions.assertThat(recordsSentBeforeDelivery).isEqualTo(0); +Assertions.assertThat(bytesSentBeforeDelivery).isEqualTo(0); +Assertions.assertThat(numRecordsSendCounter.getCount()).isEqualTo(1); + Assertions.assertThat(numBytesOutCounter.getCount()).isEqualTo(messageSize); Review Comment: same here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
snuyanzin commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601876558 ## flink-connector-gcp-pubsub/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriterTest.java: ## @@ -0,0 +1,245 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; +import org.apache.flink.connector.base.sink.writer.TestSinkInitContextAnyThreadMailbox; +import org.apache.flink.connector.gcp.pubsub.sink.util.TestGcpWriterClient; +import org.apache.flink.metrics.Counter; +import org.apache.flink.util.FlinkRuntimeException; + +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.StatusRuntimeException; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +/** Tests for {@link PubSubWriter}. */ +public class PubSubWriterTest { + +private static final int MAXIMUM_INFLIGHT_MESSAGES = 3; + +@Test +void writeMessageDeliversMessageUsingClient() throws IOException, InterruptedException { +TestGcpWriterClient client = new TestGcpWriterClient(); +PubSubWriter writer = +getDefaultWriter(client, new TestSinkInitContext(), MAXIMUM_INFLIGHT_MESSAGES); +String message = "test-message"; +writer.write(message, null); +client.deliverMessage(message); +writer.flush(false); + + Assertions.assertThat(client.getDeliveredMessages()).containsExactly(message); Review Comment: `static import` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
snuyanzin commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601876167 ## flink-connector-gcp-pubsub/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriterTest.java: ## @@ -0,0 +1,245 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.connector.base.sink.writer.TestSinkInitContext; +import org.apache.flink.connector.base.sink.writer.TestSinkInitContextAnyThreadMailbox; +import org.apache.flink.connector.gcp.pubsub.sink.util.TestGcpWriterClient; +import org.apache.flink.metrics.Counter; +import org.apache.flink.util.FlinkRuntimeException; + +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import io.grpc.StatusRuntimeException; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import java.io.IOException; +import java.util.concurrent.CountDownLatch; + +/** Tests for {@link PubSubWriter}. */ +public class PubSubWriterTest { Review Comment: ```suggestion class PubSubWriterTest { ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
snuyanzin commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601875238 ## flink-connector-gcp-pubsub/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2BuilderTest.java: ## @@ -0,0 +1,133 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; + +import com.google.api.gax.core.NoCredentialsProvider; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR; +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS; + +/** Tests for {@link PubSubSinkV2Builder}. */ +public class PubSubSinkV2BuilderTest { Review Comment: ```suggestion class PubSubSinkV2BuilderTest { ``` junit5 doesn't need `public` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
snuyanzin commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601874397 ## flink-connector-gcp-pubsub/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2BuilderTest.java: ## @@ -0,0 +1,133 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.common.serialization.SimpleStringSchema; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; + +import com.google.api.gax.core.NoCredentialsProvider; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Test; + +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR; +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS; + +/** Tests for {@link PubSubSinkV2Builder}. */ +public class PubSubSinkV2BuilderTest { + +@Test +void builderBuildsSinkWithCorrectProperties() { +PubSubSinkV2Builder builder = PubSubSinkV2.builder(); +GcpPublisherConfig gcpPublisherConfig = +GcpPublisherConfig.builder() +.setCredentialsProvider(NoCredentialsProvider.create()) +.build(); + +SerializationSchema serializationSchema = new SimpleStringSchema(); + +builder.setProjectId("test-project-id") +.setTopicId("test-topic-id") +.setGcpPublisherConfig(gcpPublisherConfig) +.setSerializationSchema(serializationSchema) +.setNumMaxInflightRequests(10) +.setFailOnError(true); +PubSubSinkV2 sink = builder.build(); + +Assertions.assertThat(sink).hasFieldOrPropertyWithValue("projectId", "test-project-id"); +Assertions.assertThat(sink).hasFieldOrPropertyWithValue("topicId", "test-topic-id"); +Assertions.assertThat(sink) +.hasFieldOrPropertyWithValue("serializationSchema", serializationSchema); +Assertions.assertThat(sink) +.hasFieldOrPropertyWithValue("publisherConfig", gcpPublisherConfig); + Assertions.assertThat(sink).hasFieldOrPropertyWithValue("maxInFlightRequests", 10); +Assertions.assertThat(sink).hasFieldOrPropertyWithValue("failOnError", true); Review Comment: normally in Flink code we have `static import` for `Assertions`, I think we should follow same here -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
snuyanzin commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601862072 ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2Builder.java: ## @@ -0,0 +1,93 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; + +import java.util.Optional; + +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR; +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS; + +/** + * A builder for creating a {@link PubSubSinkV2}. + * + * The builder uses the following parameters to build a {@link PubSubSinkV2}: + * + * + * {@link GcpPublisherConfig} for the {@link com.google.cloud.pubsub.v1.Publisher} + * configuration. + * {@link SerializationSchema} for serializing the input data. + * {@code projectId} for the name of the project where the topic is located. + * {@code topicId} for the name of the topic to send messages to. + * {@code maximumInflightMessages} for the maximum number of in-flight messages. + * {@code failOnError} for whether to fail on an error. + * + * + * It can be used as follows: + * + * {@code + * PubSubSinkV2Builder pubSubSink = {@code PubSubSinkV2Builder}.builder() + * .setProjectId("project-id") + * .setTopicId("topic-id) + * .setGcpPublisherConfig(gcpPublisherConfig) + * .setSerializationSchema(new SimpleStringSchema()) + * .setMaximumInflightMessages(10) + * .setFailOnError(true) + * .build(); + * + * } + * + * @param + */ +@PublicEvolving +public class PubSubSinkV2Builder { +private String projectId; +private String topicId; +private SerializationSchema serializationSchema; +private GcpPublisherConfig gcpPublisherConfig; +private Integer numMaxInflightRequests; +private Boolean failOnError; Review Comment: why are they not primitives? I see that these values are changed with setters where args are primitives And in `PubSubSinkV2` they are also primitives or did I miss something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
snuyanzin commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601862072 ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2Builder.java: ## @@ -0,0 +1,93 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; + +import java.util.Optional; + +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR; +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS; + +/** + * A builder for creating a {@link PubSubSinkV2}. + * + * The builder uses the following parameters to build a {@link PubSubSinkV2}: + * + * + * {@link GcpPublisherConfig} for the {@link com.google.cloud.pubsub.v1.Publisher} + * configuration. + * {@link SerializationSchema} for serializing the input data. + * {@code projectId} for the name of the project where the topic is located. + * {@code topicId} for the name of the topic to send messages to. + * {@code maximumInflightMessages} for the maximum number of in-flight messages. + * {@code failOnError} for whether to fail on an error. + * + * + * It can be used as follows: + * + * {@code + * PubSubSinkV2Builder pubSubSink = {@code PubSubSinkV2Builder}.builder() + * .setProjectId("project-id") + * .setTopicId("topic-id) + * .setGcpPublisherConfig(gcpPublisherConfig) + * .setSerializationSchema(new SimpleStringSchema()) + * .setMaximumInflightMessages(10) + * .setFailOnError(true) + * .build(); + * + * } + * + * @param + */ +@PublicEvolving +public class PubSubSinkV2Builder { +private String projectId; +private String topicId; +private SerializationSchema serializationSchema; +private GcpPublisherConfig gcpPublisherConfig; +private Integer numMaxInflightRequests; +private Boolean failOnError; Review Comment: why are they not primitives? I see that these values are changed with setters where args are primitives or did I miss something? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
snuyanzin commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601857522 ## .github/workflows/weekly.yml: ## @@ -37,10 +37,6 @@ jobs: flink: 1.19-SNAPSHOT, jdk: '8, 11, 17, 21', branch: main -}, { - flink: 1.18-SNAPSHOT, - jdk: '8, 11, 17', - branch: main Review Comment: why do we remove 1.18 here and do not do that for pr flow? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601406910 ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java: ## @@ -62,6 +62,7 @@ * * @param type of PubSubSink messages to write */ +@Deprecated public class PubSubSink extends RichSinkFunction implements CheckpointedFunction { Review Comment: We can remove this when we release a new major version `4.0` once this is merged we cann follow up with another Jira to remove -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601405551 ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2Builder.java: ## @@ -0,0 +1,93 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; + +import java.util.Optional; + +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR; +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS; + +/** + * A builder for creating a {@link PubSubSinkV2}. + * + * The builder uses the following parameters to build a {@link PubSubSinkV2}: + * + * + * {@link GcpPublisherConfig} for the {@link com.google.cloud.pubsub.v1.Publisher} + * configuration. + * {@link SerializationSchema} for serializing the input data. + * {@code projectId} for the name of the project where the topic is located. + * {@code topicId} for the name of the topic to send messages to. + * {@code maximumInflightMessages} for the maximum number of in-flight messages. + * {@code failOnError} for whether to fail on an error. + * + * + * It can be used as follows: + * + * {@code + * PubSubSinkV2Builder pubSubSink = {@link PubSubSinkV2Builder}.builder() Review Comment: Good catch! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
morazow commented on PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#issuecomment-2112125712 Thanks @vahmed-hamdy, great work! Added some minor suggestions. And a point for number of retries on failure -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
morazow commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601341788 ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2Builder.java: ## @@ -0,0 +1,93 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; + +import java.util.Optional; + +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR; +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS; + +/** + * A builder for creating a {@link PubSubSinkV2}. + * + * The builder uses the following parameters to build a {@link PubSubSinkV2}: + * + * + * {@link GcpPublisherConfig} for the {@link com.google.cloud.pubsub.v1.Publisher} + * configuration. + * {@link SerializationSchema} for serializing the input data. + * {@code projectId} for the name of the project where the topic is located. + * {@code topicId} for the name of the topic to send messages to. + * {@code maximumInflightMessages} for the maximum number of in-flight messages. + * {@code failOnError} for whether to fail on an error. + * + * + * It can be used as follows: + * + * {@code + * PubSubSinkV2Builder pubSubSink = {@link PubSubSinkV2Builder}.builder() Review Comment: Should the `{@link ` removed here? Since it is already in code block maybe it doesn't render ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2.java: ## @@ -0,0 +1,96 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.Sink; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * A RabbitMQ {@link Sink} to produce data into Gcp PubSub. The sink uses the {@link Review Comment: ```suggestion * A {@link Sink} to produce data into Gcp PubSub. The sink uses the {@link ``` ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java: ## @@ -62,6 +62,7 @@ * * @param type of PubSubSink messages to write */ +@Deprecated public class PubSubSink extends RichSinkFunction implements CheckpointedFunction { Review Comment: ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java: ## @@ -0,0 +1,195 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.TopicName; + +import java.io.IOException; +import java.util.Optional; + +import static org.apache.flink.util.concurrent.Executors.directExecutor; + +/** + * A stateless {@link SinkWriter} that writes records to Pub/Sub using generic {@link + * GcpWriterClient}. The writer blocks on completion of inflight requests on {@code flush()} and + * {@code close()}. The writer also uses {@code maxInFlightRequests} and blocks new writes if the + * number of inflight requests. Review Comment: Sentence is not finished? `.. exceeds that number` ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java: ## @@ -62,6 +62,7 @@ * * @param type of PubSubSink messages to write */ +@Deprecated public class PubSubSink extends RichSinkFunction implements CheckpointedFunction { Review Comment: When is good to remove these API? ## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java: ## @@ -0,0 +1,195 @@ +package
Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]
vahmed-hamdy commented on PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#issuecomment-2111082697 @snuyanzin would you mind taking a look, We want to complete this migration since the `SinkFuction` is deprecated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org