Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-06-06 Thread via GitHub


snuyanzin merged PR #27:
URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-27 Thread via GitHub


vahmed-hamdy commented on PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#issuecomment-2133038364

   @snuyanzin I addressed the introduced dependency issue, and the feedback 
comments.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-27 Thread via GitHub


vahmed-hamdy commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1615752158


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/common/PubSubConstants.java:
##
@@ -0,0 +1,11 @@
+package org.apache.flink.connector.gcp.pubsub.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Constants for PubSub. */
+@PublicEvolving
+public class PubSubConstants {

Review Comment:
   It is supposed to be base for extension with other constants, but I agree 
let's leave that when we actually need it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-26 Thread via GitHub


snuyanzin commented on PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#issuecomment-2132266341

   it seems dependency convergence issue should be resolved


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-26 Thread via GitHub


snuyanzin commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1615240064


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/common/PubSubConstants.java:
##
@@ -0,0 +1,11 @@
+package org.apache.flink.connector.gcp.pubsub.common;
+
+import org.apache.flink.annotation.PublicEvolving;
+
+/** Constants for PubSub. */
+@PublicEvolving
+public class PubSubConstants {

Review Comment:
   Do we really need a separate class for these constants if they are used only 
in `PubSubSinkV2Builder` and `PubSubSinkV2BuilderTest` ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-16 Thread via GitHub


morazow commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1603322536


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java:
##
@@ -0,0 +1,195 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.TopicName;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.concurrent.Executors.directExecutor;
+
+/**
+ * A stateless {@link SinkWriter} that writes records to Pub/Sub using generic 
{@link
+ * GcpWriterClient}. The writer blocks on completion of inflight requests on 
{@code flush()} and
+ * {@code close()}. The writer also uses {@code maxInFlightRequests} and 
blocks new writes if the
+ * number of inflight requests.
+ *
+ * @param  The type of the records .
+ */
+@Internal
+public class PubSubWriter implements SinkWriter {
+
+/** The PubSub generic client to publish messages. */
+private final GcpWriterClient publisher;
+
+/**
+ * The maximum number of inflight requests, The writer blocks new writes 
if the number of
+ * inflight requests exceeds the specified limit.
+ */
+private final long maximumInflightRequests;
+
+/**
+ * Flag to indicate whether to fail on errors, if unset the writer will 
retry non-fatal request
+ * failures.
+ */
+private final boolean failOnError;
+
+private long inflightRequests = 0;
+
+private final MailboxExecutor mailboxExecutor;
+private final Counter numBytesOutCounter;
+private final Counter numRecordsOutCounter;
+private final Counter numRecordsOutErrorCounter;
+private final SerializationSchema serializationSchema;
+
+PubSubWriter(
+String projectId,
+String topicId,
+SerializationSchema serializationSchema,
+WriterInitContext context,
+GcpPublisherConfig publisherConfig,
+long maximumInflightRequests,
+boolean failOnError)
+throws IOException {
+this(
+createPublisher(projectId, topicId, publisherConfig),
+context,
+serializationSchema,
+maximumInflightRequests,
+failOnError);
+}
+
+@VisibleForTesting
+PubSubWriter(
+GcpWriterClient publisher,
+WriterInitContext context,
+SerializationSchema serializationSchema,
+long maximumInflightRequests,
+boolean failOnError) {
+this.publisher = Preconditions.checkNotNull(publisher);
+this.serializationSchema = 
Preconditions.checkNotNull(serializationSchema);
+Preconditions.checkNotNull(context, "Context cannot be null.");
+
+this.mailboxExecutor = context.getMailboxExecutor();
+this.numBytesOutCounter = 
context.metricGroup().getIOMetricGroup().getNumBytesOutCounter();
+this.numRecordsOutCounter =
+
context.metricGroup().getIOMetricGroup().getNumRecordsOutCounter();
+this.numRecordsOutErrorCounter = 
context.metricGroup().getNumRecordsOutErrorsCounter();
+this.maximumInflightRequests = maximumInflightRequests;
+this.failOnError = failOnError;
+}
+
+@Override
+public void write(T t, SinkWriter.Context context) throws IOException, 
InterruptedException {
+awaitMaxInflightRequestsBelow(maximumInflightRequests);
+PubsubMessage message =
+PubsubMessage.newBuilder()
+
.setData(ByteString.copyFrom(serializationSchema.serialize(t)))
+.build();
+publishMessage(message);
+}
+
+@Override
+public void flush(boolean b) throws IOException, InterruptedException {
+publisher.flush();
+awaitMaxInflightRequestsBelow(1);
+}
+
+private void awaitMaxInflightRequestsBelow(long maxInflightRequests)
+throws InterruptedException {
+while (inflightRequests >= maxInflightRequests) {
+mailboxExecutor.yield();
+}
+}
+
+private void 

Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-16 Thread via GitHub


vahmed-hamdy commented on PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#issuecomment-2114737215

   @snuyanzin thanks for the feedback, all addressed now, could you have 
another round please?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-16 Thread via GitHub


vahmed-hamdy commented on PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#issuecomment-2114736463

   @morazow @affo Thanks for the feedback, addressed you comments


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-16 Thread via GitHub


vahmed-hamdy commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1602999204


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java:
##
@@ -0,0 +1,195 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.TopicName;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.concurrent.Executors.directExecutor;
+
+/**
+ * A stateless {@link SinkWriter} that writes records to Pub/Sub using generic 
{@link
+ * GcpWriterClient}. The writer blocks on completion of inflight requests on 
{@code flush()} and
+ * {@code close()}. The writer also uses {@code maxInFlightRequests} and 
blocks new writes if the
+ * number of inflight requests.
+ *
+ * @param  The type of the records .
+ */
+@Internal
+public class PubSubWriter implements SinkWriter {
+
+/** The PubSub generic client to publish messages. */
+private final GcpWriterClient publisher;
+
+/**
+ * The maximum number of inflight requests, The writer blocks new writes 
if the number of
+ * inflight requests exceeds the specified limit.
+ */
+private final long maximumInflightRequests;
+
+/**
+ * Flag to indicate whether to fail on errors, if unset the writer will 
retry non-fatal request
+ * failures.
+ */
+private final boolean failOnError;
+
+private long inflightRequests = 0;
+
+private final MailboxExecutor mailboxExecutor;
+private final Counter numBytesOutCounter;
+private final Counter numRecordsOutCounter;
+private final Counter numRecordsOutErrorCounter;
+private final SerializationSchema serializationSchema;
+
+PubSubWriter(
+String projectId,
+String topicId,
+SerializationSchema serializationSchema,
+WriterInitContext context,
+GcpPublisherConfig publisherConfig,
+long maximumInflightRequests,
+boolean failOnError)
+throws IOException {
+this(
+createPublisher(projectId, topicId, publisherConfig),
+context,
+serializationSchema,
+maximumInflightRequests,
+failOnError);
+}
+
+@VisibleForTesting
+PubSubWriter(
+GcpWriterClient publisher,
+WriterInitContext context,
+SerializationSchema serializationSchema,
+long maximumInflightRequests,
+boolean failOnError) {
+this.publisher = Preconditions.checkNotNull(publisher);
+this.serializationSchema = 
Preconditions.checkNotNull(serializationSchema);
+Preconditions.checkNotNull(context, "Context cannot be null.");
+
+this.mailboxExecutor = context.getMailboxExecutor();
+this.numBytesOutCounter = 
context.metricGroup().getIOMetricGroup().getNumBytesOutCounter();
+this.numRecordsOutCounter =
+
context.metricGroup().getIOMetricGroup().getNumRecordsOutCounter();
+this.numRecordsOutErrorCounter = 
context.metricGroup().getNumRecordsOutErrorsCounter();
+this.maximumInflightRequests = maximumInflightRequests;
+this.failOnError = failOnError;
+}
+
+@Override
+public void write(T t, SinkWriter.Context context) throws IOException, 
InterruptedException {
+awaitMaxInflightRequestsBelow(maximumInflightRequests);
+PubsubMessage message =
+PubsubMessage.newBuilder()
+
.setData(ByteString.copyFrom(serializationSchema.serialize(t)))
+.build();
+publishMessage(message);
+}
+
+@Override
+public void flush(boolean b) throws IOException, InterruptedException {
+publisher.flush();
+awaitMaxInflightRequestsBelow(1);
+}
+
+private void awaitMaxInflightRequestsBelow(long maxInflightRequests)
+throws InterruptedException {
+while (inflightRequests >= maxInflightRequests) {
+mailboxExecutor.yield();
+}
+}
+
+private void 

Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-16 Thread via GitHub


vahmed-hamdy commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1602993681


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java:
##
@@ -0,0 +1,195 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.TopicName;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.concurrent.Executors.directExecutor;
+
+/**
+ * A stateless {@link SinkWriter} that writes records to Pub/Sub using generic 
{@link
+ * GcpWriterClient}. The writer blocks on completion of inflight requests on 
{@code flush()} and
+ * {@code close()}. The writer also uses {@code maxInFlightRequests} and 
blocks new writes if the
+ *  number of inflight requests exceeds the specified limit.
+ *
+ * @param  The type of the records .
+ */
+@Internal
+public class PubSubWriter implements SinkWriter {
+
+/** The PubSub generic client to publish messages. */
+private final GcpWriterClient publisher;
+
+/**
+ * The maximum number of inflight requests, The writer blocks new writes 
if the number of
+ * inflight requests exceeds the specified limit.
+ */
+private final long maximumInflightRequests;
+
+/**
+ * Flag to indicate whether to fail on errors, if unset the writer will 
retry non-fatal request
+ * failures.
+ */
+private final boolean failOnError;
+
+private long inflightRequests = 0;
+
+private final MailboxExecutor mailboxExecutor;
+private final Counter numBytesOutCounter;
+private final Counter numRecordsOutCounter;
+private final Counter numRecordsOutErrorCounter;
+private final SerializationSchema serializationSchema;
+
+PubSubWriter(
+String projectId,
+String topicId,
+SerializationSchema serializationSchema,
+WriterInitContext context,
+GcpPublisherConfig publisherConfig,
+long maximumInflightRequests,
+boolean failOnError)
+throws IOException {
+this(
+createPublisher(projectId, topicId, publisherConfig),
+context,
+serializationSchema,
+maximumInflightRequests,
+failOnError);
+}
+
+@VisibleForTesting
+PubSubWriter(
+GcpWriterClient publisher,
+WriterInitContext context,
+SerializationSchema serializationSchema,
+long maximumInflightRequests,
+boolean failOnError) {
+this.publisher = Preconditions.checkNotNull(publisher);
+this.serializationSchema = 
Preconditions.checkNotNull(serializationSchema);
+Preconditions.checkNotNull(context, "Context cannot be null.");
+
+this.mailboxExecutor = context.getMailboxExecutor();
+this.numBytesOutCounter = 
context.metricGroup().getIOMetricGroup().getNumBytesOutCounter();
+this.numRecordsOutCounter =
+
context.metricGroup().getIOMetricGroup().getNumRecordsOutCounter();
+this.numRecordsOutErrorCounter = 
context.metricGroup().getNumRecordsOutErrorsCounter();
+this.maximumInflightRequests = maximumInflightRequests;
+this.failOnError = failOnError;
+}
+
+@Override
+public void write(T t, SinkWriter.Context context) throws IOException, 
InterruptedException {
+awaitMaxInflightRequestsBelow(maximumInflightRequests);
+PubsubMessage message =
+PubsubMessage.newBuilder()
+
.setData(ByteString.copyFrom(serializationSchema.serialize(t)))
+.build();
+publishMessage(message);
+}
+
+@Override
+public void flush(boolean b) throws IOException, InterruptedException {
+publisher.flush();
+awaitMaxInflightRequestsBelow(1);

Review Comment:
   @affo the inflight requests callback is queued in the mailbox, we yield here 
for callback executions, shall requests be sent successfully the call back 
should reset the inflight requests back to 0 exiting the loop



-- 

Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-16 Thread via GitHub


vahmed-hamdy commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1602978021


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2Builder.java:
##
@@ -0,0 +1,93 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR;
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS;
+
+/**
+ * A builder for creating a {@link PubSubSinkV2}.
+ *
+ * The builder uses the following parameters to build a {@link 
PubSubSinkV2}:
+ *
+ * 
+ *   {@link GcpPublisherConfig} for the {@link 
com.google.cloud.pubsub.v1.Publisher}
+ *   configuration.
+ *   {@link SerializationSchema} for serializing the input data.
+ *   {@code projectId} for the name of the project where the topic is 
located.
+ *   {@code topicId} for the name of the topic to send messages to.
+ *   {@code maximumInflightMessages} for the maximum number of in-flight 
messages.
+ *   {@code failOnError} for whether to fail on an error.
+ * 
+ *
+ * It can be used as follows:
+ *
+ * {@code
+ * PubSubSinkV2Builder pubSubSink = {@code 
PubSubSinkV2Builder}.builder()
+ * .setProjectId("project-id")
+ * .setTopicId("topic-id)
+ * .setGcpPublisherConfig(gcpPublisherConfig)
+ * .setSerializationSchema(new SimpleStringSchema())
+ * .setMaximumInflightMessages(10)
+ * .setFailOnError(true)
+ * .build();
+ *
+ * }
+ *
+ * @param 
+ */
+@PublicEvolving
+public class PubSubSinkV2Builder {
+private String projectId;
+private String topicId;
+private SerializationSchema serializationSchema;
+private GcpPublisherConfig gcpPublisherConfig;
+private Integer numMaxInflightRequests;
+private Boolean failOnError;
+
+public PubSubSinkV2Builder setProjectId(String projectId) {
+this.projectId = projectId;
+return this;
+}
+
+public PubSubSinkV2Builder setTopicId(String topicId) {
+this.topicId = topicId;
+return this;
+}
+
+public PubSubSinkV2Builder setSerializationSchema(
+SerializationSchema serializationSchema) {
+this.serializationSchema = serializationSchema;
+return this;
+}
+
+public PubSubSinkV2Builder setGcpPublisherConfig(GcpPublisherConfig 
gcpPublisherConfig) {
+this.gcpPublisherConfig = gcpPublisherConfig;
+return this;
+}
+
+public PubSubSinkV2Builder setNumMaxInflightRequests(int 
numMaxInflightRequests) {
+this.numMaxInflightRequests = numMaxInflightRequests;

Review Comment:
   This is validated in sink constructor
   ```
Preconditions.checkArgument(
   maxInFlightRequests > 0, "Max in-flight requests must be 
greater than 0.");
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-16 Thread via GitHub


vahmed-hamdy commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1602973242


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java:
##
@@ -0,0 +1,195 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.TopicName;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.concurrent.Executors.directExecutor;
+
+/**
+ * A stateless {@link SinkWriter} that writes records to Pub/Sub using generic 
{@link
+ * GcpWriterClient}. The writer blocks on completion of inflight requests on 
{@code flush()} and
+ * {@code close()}. The writer also uses {@code maxInFlightRequests} and 
blocks new writes if the
+ *  number of inflight requests exceeds the specified limit.
+ *
+ * @param  The type of the records .
+ */
+@Internal
+public class PubSubWriter implements SinkWriter {
+
+/** The PubSub generic client to publish messages. */
+private final GcpWriterClient publisher;
+
+/**
+ * The maximum number of inflight requests, The writer blocks new writes 
if the number of
+ * inflight requests exceeds the specified limit.
+ */
+private final long maximumInflightRequests;

Review Comment:
   Agreed, will use `int` for both 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-16 Thread via GitHub


vahmed-hamdy commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1602970403


##
flink-connector-gcp-pubsub/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2BuilderTest.java:
##
@@ -0,0 +1,133 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+
+import com.google.api.gax.core.NoCredentialsProvider;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR;
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS;
+
+/** Tests for {@link PubSubSinkV2Builder}. */
+public class PubSubSinkV2BuilderTest {
+
+@Test
+void builderBuildsSinkWithCorrectProperties() {
+PubSubSinkV2Builder builder = PubSubSinkV2.builder();
+GcpPublisherConfig gcpPublisherConfig =
+GcpPublisherConfig.builder()
+.setCredentialsProvider(NoCredentialsProvider.create())
+.build();
+
+SerializationSchema serializationSchema = new 
SimpleStringSchema();
+
+builder.setProjectId("test-project-id")
+.setTopicId("test-topic-id")
+.setGcpPublisherConfig(gcpPublisherConfig)
+.setSerializationSchema(serializationSchema)
+.setNumMaxInflightRequests(10)
+.setFailOnError(true);
+PubSubSinkV2 sink = builder.build();
+
+Assertions.assertThat(sink).hasFieldOrPropertyWithValue("projectId", 
"test-project-id");
+Assertions.assertThat(sink).hasFieldOrPropertyWithValue("topicId", 
"test-topic-id");
+Assertions.assertThat(sink)
+.hasFieldOrPropertyWithValue("serializationSchema", 
serializationSchema);
+Assertions.assertThat(sink)
+.hasFieldOrPropertyWithValue("publisherConfig", 
gcpPublisherConfig);
+
Assertions.assertThat(sink).hasFieldOrPropertyWithValue("maxInFlightRequests", 
10);
+Assertions.assertThat(sink).hasFieldOrPropertyWithValue("failOnError", 
true);

Review Comment:
   yeah, that would make more sense here 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-16 Thread via GitHub


vahmed-hamdy commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1602966998


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2Builder.java:
##
@@ -0,0 +1,93 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR;
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS;
+
+/**
+ * A builder for creating a {@link PubSubSinkV2}.
+ *
+ * The builder uses the following parameters to build a {@link 
PubSubSinkV2}:
+ *
+ * 
+ *   {@link GcpPublisherConfig} for the {@link 
com.google.cloud.pubsub.v1.Publisher}
+ *   configuration.
+ *   {@link SerializationSchema} for serializing the input data.
+ *   {@code projectId} for the name of the project where the topic is 
located.
+ *   {@code topicId} for the name of the topic to send messages to.
+ *   {@code maximumInflightMessages} for the maximum number of in-flight 
messages.
+ *   {@code failOnError} for whether to fail on an error.
+ * 
+ *
+ * It can be used as follows:
+ *
+ * {@code
+ * PubSubSinkV2Builder pubSubSink = {@code 
PubSubSinkV2Builder}.builder()
+ * .setProjectId("project-id")
+ * .setTopicId("topic-id)
+ * .setGcpPublisherConfig(gcpPublisherConfig)
+ * .setSerializationSchema(new SimpleStringSchema())
+ * .setMaximumInflightMessages(10)
+ * .setFailOnError(true)
+ * .build();
+ *
+ * }
+ *
+ * @param 
+ */
+@PublicEvolving
+public class PubSubSinkV2Builder {
+private String projectId;
+private String topicId;
+private SerializationSchema serializationSchema;
+private GcpPublisherConfig gcpPublisherConfig;
+private Integer numMaxInflightRequests;
+private Boolean failOnError;

Review Comment:
   They are nullable to use defaults if not set with builder, they are never 
nullable when passed to the sink though
   ```
   Optional.ofNullable(numMaxInflightRequests)
   .orElse(DEFAULT_MAXIMUM_INFLIGHT_REQUESTS),
   
Optional.ofNullable(failOnError).orElse(DEFAULT_FAIL_ON_ERROR))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-16 Thread via GitHub


vahmed-hamdy commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1602966998


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2Builder.java:
##
@@ -0,0 +1,93 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR;
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS;
+
+/**
+ * A builder for creating a {@link PubSubSinkV2}.
+ *
+ * The builder uses the following parameters to build a {@link 
PubSubSinkV2}:
+ *
+ * 
+ *   {@link GcpPublisherConfig} for the {@link 
com.google.cloud.pubsub.v1.Publisher}
+ *   configuration.
+ *   {@link SerializationSchema} for serializing the input data.
+ *   {@code projectId} for the name of the project where the topic is 
located.
+ *   {@code topicId} for the name of the topic to send messages to.
+ *   {@code maximumInflightMessages} for the maximum number of in-flight 
messages.
+ *   {@code failOnError} for whether to fail on an error.
+ * 
+ *
+ * It can be used as follows:
+ *
+ * {@code
+ * PubSubSinkV2Builder pubSubSink = {@code 
PubSubSinkV2Builder}.builder()
+ * .setProjectId("project-id")
+ * .setTopicId("topic-id)
+ * .setGcpPublisherConfig(gcpPublisherConfig)
+ * .setSerializationSchema(new SimpleStringSchema())
+ * .setMaximumInflightMessages(10)
+ * .setFailOnError(true)
+ * .build();
+ *
+ * }
+ *
+ * @param 
+ */
+@PublicEvolving
+public class PubSubSinkV2Builder {
+private String projectId;
+private String topicId;
+private SerializationSchema serializationSchema;
+private GcpPublisherConfig gcpPublisherConfig;
+private Integer numMaxInflightRequests;
+private Boolean failOnError;

Review Comment:
   They are nullable to use defaults when needed
   ```
   Optional.ofNullable(numMaxInflightRequests)
   .orElse(DEFAULT_MAXIMUM_INFLIGHT_REQUESTS),
   
Optional.ofNullable(failOnError).orElse(DEFAULT_FAIL_ON_ERROR))
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-16 Thread via GitHub


vahmed-hamdy commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1602964623


##
.github/workflows/weekly.yml:
##
@@ -37,10 +37,6 @@ jobs:
   flink: 1.19-SNAPSHOT,
   jdk: '8, 11, 17, 21',
   branch: main
-}, {
-  flink: 1.18-SNAPSHOT,
-  jdk: '8, 11, 17',
-  branch: main

Review Comment:
   Good catch! Will remove from pr flow



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-16 Thread via GitHub


affo commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1602735147


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java:
##
@@ -0,0 +1,195 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.TopicName;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.concurrent.Executors.directExecutor;
+
+/**
+ * A stateless {@link SinkWriter} that writes records to Pub/Sub using generic 
{@link

Review Comment:
   Following up on @snuyanzin comments, let's keep the consistency, either 
`Pub/Sub` or `PubSub`



##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java:
##
@@ -0,0 +1,195 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.TopicName;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.concurrent.Executors.directExecutor;
+
+/**
+ * A stateless {@link SinkWriter} that writes records to Pub/Sub using generic 
{@link
+ * GcpWriterClient}. The writer blocks on completion of inflight requests on 
{@code flush()} and
+ * {@code close()}. The writer also uses {@code maxInFlightRequests} and 
blocks new writes if the
+ *  number of inflight requests exceeds the specified limit.
+ *
+ * @param  The type of the records .
+ */
+@Internal
+public class PubSubWriter implements SinkWriter {
+
+/** The PubSub generic client to publish messages. */
+private final GcpWriterClient publisher;
+
+/**
+ * The maximum number of inflight requests, The writer blocks new writes 
if the number of
+ * inflight requests exceeds the specified limit.
+ */
+private final long maximumInflightRequests;
+
+/**
+ * Flag to indicate whether to fail on errors, if unset the writer will 
retry non-fatal request
+ * failures.
+ */
+private final boolean failOnError;
+
+private long inflightRequests = 0;
+
+private final MailboxExecutor mailboxExecutor;
+private final Counter numBytesOutCounter;
+private final Counter numRecordsOutCounter;
+private final Counter numRecordsOutErrorCounter;
+private final SerializationSchema serializationSchema;
+
+PubSubWriter(
+String projectId,
+String topicId,
+SerializationSchema serializationSchema,
+WriterInitContext context,
+GcpPublisherConfig publisherConfig,
+long maximumInflightRequests,
+boolean failOnError)
+throws IOException {
+this(
+createPublisher(projectId, topicId, publisherConfig),
+context,
+serializationSchema,
+maximumInflightRequests,
+failOnError);
+}
+
+@VisibleForTesting
+PubSubWriter(
+GcpWriterClient publisher,
+WriterInitContext context,
+SerializationSchema serializationSchema,
+long maximumInflightRequests,
+boolean failOnError) {
+this.publisher = Preconditions.checkNotNull(publisher);
+this.serializationSchema = 

Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


snuyanzin commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601908776


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2Builder.java:
##
@@ -0,0 +1,93 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR;
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS;
+
+/**
+ * A builder for creating a {@link PubSubSinkV2}.
+ *
+ * The builder uses the following parameters to build a {@link 
PubSubSinkV2}:
+ *
+ * 
+ *   {@link GcpPublisherConfig} for the {@link 
com.google.cloud.pubsub.v1.Publisher}
+ *   configuration.
+ *   {@link SerializationSchema} for serializing the input data.
+ *   {@code projectId} for the name of the project where the topic is 
located.
+ *   {@code topicId} for the name of the topic to send messages to.
+ *   {@code maximumInflightMessages} for the maximum number of in-flight 
messages.
+ *   {@code failOnError} for whether to fail on an error.
+ * 
+ *
+ * It can be used as follows:
+ *
+ * {@code
+ * PubSubSinkV2Builder pubSubSink = {@code 
PubSubSinkV2Builder}.builder()
+ * .setProjectId("project-id")
+ * .setTopicId("topic-id)
+ * .setGcpPublisherConfig(gcpPublisherConfig)
+ * .setSerializationSchema(new SimpleStringSchema())
+ * .setMaximumInflightMessages(10)
+ * .setFailOnError(true)
+ * .build();
+ *
+ * }
+ *
+ * @param 
+ */
+@PublicEvolving
+public class PubSubSinkV2Builder {
+private String projectId;
+private String topicId;
+private SerializationSchema serializationSchema;
+private GcpPublisherConfig gcpPublisherConfig;
+private Integer numMaxInflightRequests;
+private Boolean failOnError;
+
+public PubSubSinkV2Builder setProjectId(String projectId) {
+this.projectId = projectId;
+return this;
+}
+
+public PubSubSinkV2Builder setTopicId(String topicId) {
+this.topicId = topicId;
+return this;
+}
+
+public PubSubSinkV2Builder setSerializationSchema(
+SerializationSchema serializationSchema) {
+this.serializationSchema = serializationSchema;
+return this;
+}
+
+public PubSubSinkV2Builder setGcpPublisherConfig(GcpPublisherConfig 
gcpPublisherConfig) {
+this.gcpPublisherConfig = gcpPublisherConfig;
+return this;
+}
+
+public PubSubSinkV2Builder setNumMaxInflightRequests(int 
numMaxInflightRequests) {
+this.numMaxInflightRequests = numMaxInflightRequests;

Review Comment:
   what happens if we pass a negative number?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


snuyanzin commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601907967


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java:
##
@@ -0,0 +1,195 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.TopicName;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.concurrent.Executors.directExecutor;
+
+/**
+ * A stateless {@link SinkWriter} that writes records to Pub/Sub using generic 
{@link
+ * GcpWriterClient}. The writer blocks on completion of inflight requests on 
{@code flush()} and
+ * {@code close()}. The writer also uses {@code maxInFlightRequests} and 
blocks new writes if the
+ *  number of inflight requests exceeds the specified limit.
+ *
+ * @param  The type of the records .
+ */
+@Internal
+public class PubSubWriter implements SinkWriter {
+
+/** The PubSub generic client to publish messages. */
+private final GcpWriterClient publisher;
+
+/**
+ * The maximum number of inflight requests, The writer blocks new writes 
if the number of
+ * inflight requests exceeds the specified limit.
+ */
+private final long maximumInflightRequests;

Review Comment:
   Can we align types?
   here it is `long` while in `PubSubSinkV2Builder` it is `int`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


snuyanzin commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601895646


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2Builder.java:
##
@@ -0,0 +1,93 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR;
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS;
+
+/**
+ * A builder for creating a {@link PubSubSinkV2}.
+ *
+ * The builder uses the following parameters to build a {@link 
PubSubSinkV2}:
+ *
+ * 
+ *   {@link GcpPublisherConfig} for the {@link 
com.google.cloud.pubsub.v1.Publisher}
+ *   configuration.
+ *   {@link SerializationSchema} for serializing the input data.
+ *   {@code projectId} for the name of the project where the topic is 
located.
+ *   {@code topicId} for the name of the topic to send messages to.
+ *   {@code maximumInflightMessages} for the maximum number of in-flight 
messages.

Review Comment:
   ```suggestion
*   {@code maximumInflightMessages} for the maximum number of inflight 
messages.
   ```
   i would suggest we use the same word everywhere rather then it's different 
variations



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


snuyanzin commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601877099


##
flink-connector-gcp-pubsub/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriterTest.java:
##
@@ -0,0 +1,245 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+import 
org.apache.flink.connector.base.sink.writer.TestSinkInitContextAnyThreadMailbox;
+import org.apache.flink.connector.gcp.pubsub.sink.util.TestGcpWriterClient;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.StatusRuntimeException;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+/** Tests for {@link PubSubWriter}. */
+public class PubSubWriterTest {
+
+private static final int MAXIMUM_INFLIGHT_MESSAGES = 3;
+
+@Test
+void writeMessageDeliversMessageUsingClient() throws IOException, 
InterruptedException {
+TestGcpWriterClient client = new TestGcpWriterClient();
+PubSubWriter writer =
+getDefaultWriter(client, new TestSinkInitContext(), 
MAXIMUM_INFLIGHT_MESSAGES);
+String message = "test-message";
+writer.write(message, null);
+client.deliverMessage(message);
+writer.flush(false);
+
+
Assertions.assertThat(client.getDeliveredMessages()).containsExactly(message);
+}
+
+@Test
+void writeMessageIncrementsMetricsOnDelivery() throws IOException, 
InterruptedException {
+TestGcpWriterClient client = new TestGcpWriterClient();
+WriterInitContext context = new TestSinkInitContext();
+
+PubSubWriter writer = getDefaultWriter(client, context, 
MAXIMUM_INFLIGHT_MESSAGES);
+String message = "test-message";
+
+// write message
+writer.write(message, null);
+
+// get metrics before delivery
+Counter numBytesOutCounter = 
context.metricGroup().getNumBytesSendCounter();
+Counter numRecordsSendCounter = 
context.metricGroup().getNumRecordsSendCounter();
+long recordsSentBeforeDelivery = numRecordsSendCounter.getCount();
+long bytesSentBeforeDelivery = numBytesOutCounter.getCount();
+
+// deliver message
+client.deliverMessage(message);
+writer.flush(false);
+
+long messageSize =
+PubsubMessage.newBuilder()
+.setData(ByteString.copyFromUtf8(message))
+.build()
+.getSerializedSize();
+
+Assertions.assertThat(recordsSentBeforeDelivery).isEqualTo(0);
+Assertions.assertThat(bytesSentBeforeDelivery).isEqualTo(0);
+Assertions.assertThat(numRecordsSendCounter.getCount()).isEqualTo(1);
+
Assertions.assertThat(numBytesOutCounter.getCount()).isEqualTo(messageSize);

Review Comment:
   same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


snuyanzin commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601876558


##
flink-connector-gcp-pubsub/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriterTest.java:
##
@@ -0,0 +1,245 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+import 
org.apache.flink.connector.base.sink.writer.TestSinkInitContextAnyThreadMailbox;
+import org.apache.flink.connector.gcp.pubsub.sink.util.TestGcpWriterClient;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.StatusRuntimeException;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+/** Tests for {@link PubSubWriter}. */
+public class PubSubWriterTest {
+
+private static final int MAXIMUM_INFLIGHT_MESSAGES = 3;
+
+@Test
+void writeMessageDeliversMessageUsingClient() throws IOException, 
InterruptedException {
+TestGcpWriterClient client = new TestGcpWriterClient();
+PubSubWriter writer =
+getDefaultWriter(client, new TestSinkInitContext(), 
MAXIMUM_INFLIGHT_MESSAGES);
+String message = "test-message";
+writer.write(message, null);
+client.deliverMessage(message);
+writer.flush(false);
+
+
Assertions.assertThat(client.getDeliveredMessages()).containsExactly(message);

Review Comment:
   `static import`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


snuyanzin commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601876167


##
flink-connector-gcp-pubsub/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriterTest.java:
##
@@ -0,0 +1,245 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.base.sink.writer.TestSinkInitContext;
+import 
org.apache.flink.connector.base.sink.writer.TestSinkInitContextAnyThreadMailbox;
+import org.apache.flink.connector.gcp.pubsub.sink.util.TestGcpWriterClient;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.FlinkRuntimeException;
+
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import io.grpc.StatusRuntimeException;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.concurrent.CountDownLatch;
+
+/** Tests for {@link PubSubWriter}. */
+public class PubSubWriterTest {

Review Comment:
   ```suggestion
   class PubSubWriterTest {
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


snuyanzin commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601875238


##
flink-connector-gcp-pubsub/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2BuilderTest.java:
##
@@ -0,0 +1,133 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+
+import com.google.api.gax.core.NoCredentialsProvider;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR;
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS;
+
+/** Tests for {@link PubSubSinkV2Builder}. */
+public class PubSubSinkV2BuilderTest {

Review Comment:
   ```suggestion
   class PubSubSinkV2BuilderTest {
   ```
   junit5 doesn't need `public`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


snuyanzin commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601874397


##
flink-connector-gcp-pubsub/src/test/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2BuilderTest.java:
##
@@ -0,0 +1,133 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+
+import com.google.api.gax.core.NoCredentialsProvider;
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR;
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS;
+
+/** Tests for {@link PubSubSinkV2Builder}. */
+public class PubSubSinkV2BuilderTest {
+
+@Test
+void builderBuildsSinkWithCorrectProperties() {
+PubSubSinkV2Builder builder = PubSubSinkV2.builder();
+GcpPublisherConfig gcpPublisherConfig =
+GcpPublisherConfig.builder()
+.setCredentialsProvider(NoCredentialsProvider.create())
+.build();
+
+SerializationSchema serializationSchema = new 
SimpleStringSchema();
+
+builder.setProjectId("test-project-id")
+.setTopicId("test-topic-id")
+.setGcpPublisherConfig(gcpPublisherConfig)
+.setSerializationSchema(serializationSchema)
+.setNumMaxInflightRequests(10)
+.setFailOnError(true);
+PubSubSinkV2 sink = builder.build();
+
+Assertions.assertThat(sink).hasFieldOrPropertyWithValue("projectId", 
"test-project-id");
+Assertions.assertThat(sink).hasFieldOrPropertyWithValue("topicId", 
"test-topic-id");
+Assertions.assertThat(sink)
+.hasFieldOrPropertyWithValue("serializationSchema", 
serializationSchema);
+Assertions.assertThat(sink)
+.hasFieldOrPropertyWithValue("publisherConfig", 
gcpPublisherConfig);
+
Assertions.assertThat(sink).hasFieldOrPropertyWithValue("maxInFlightRequests", 
10);
+Assertions.assertThat(sink).hasFieldOrPropertyWithValue("failOnError", 
true);

Review Comment:
   normally in Flink code we have `static import` for `Assertions`, I think we 
should follow same here



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


snuyanzin commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601862072


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2Builder.java:
##
@@ -0,0 +1,93 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR;
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS;
+
+/**
+ * A builder for creating a {@link PubSubSinkV2}.
+ *
+ * The builder uses the following parameters to build a {@link 
PubSubSinkV2}:
+ *
+ * 
+ *   {@link GcpPublisherConfig} for the {@link 
com.google.cloud.pubsub.v1.Publisher}
+ *   configuration.
+ *   {@link SerializationSchema} for serializing the input data.
+ *   {@code projectId} for the name of the project where the topic is 
located.
+ *   {@code topicId} for the name of the topic to send messages to.
+ *   {@code maximumInflightMessages} for the maximum number of in-flight 
messages.
+ *   {@code failOnError} for whether to fail on an error.
+ * 
+ *
+ * It can be used as follows:
+ *
+ * {@code
+ * PubSubSinkV2Builder pubSubSink = {@code 
PubSubSinkV2Builder}.builder()
+ * .setProjectId("project-id")
+ * .setTopicId("topic-id)
+ * .setGcpPublisherConfig(gcpPublisherConfig)
+ * .setSerializationSchema(new SimpleStringSchema())
+ * .setMaximumInflightMessages(10)
+ * .setFailOnError(true)
+ * .build();
+ *
+ * }
+ *
+ * @param 
+ */
+@PublicEvolving
+public class PubSubSinkV2Builder {
+private String projectId;
+private String topicId;
+private SerializationSchema serializationSchema;
+private GcpPublisherConfig gcpPublisherConfig;
+private Integer numMaxInflightRequests;
+private Boolean failOnError;

Review Comment:
   why are they not primitives?
   I see that these values are changed with setters where args are primitives
   And in `PubSubSinkV2` they are also primitives
   or did I miss something?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


snuyanzin commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601862072


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2Builder.java:
##
@@ -0,0 +1,93 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR;
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS;
+
+/**
+ * A builder for creating a {@link PubSubSinkV2}.
+ *
+ * The builder uses the following parameters to build a {@link 
PubSubSinkV2}:
+ *
+ * 
+ *   {@link GcpPublisherConfig} for the {@link 
com.google.cloud.pubsub.v1.Publisher}
+ *   configuration.
+ *   {@link SerializationSchema} for serializing the input data.
+ *   {@code projectId} for the name of the project where the topic is 
located.
+ *   {@code topicId} for the name of the topic to send messages to.
+ *   {@code maximumInflightMessages} for the maximum number of in-flight 
messages.
+ *   {@code failOnError} for whether to fail on an error.
+ * 
+ *
+ * It can be used as follows:
+ *
+ * {@code
+ * PubSubSinkV2Builder pubSubSink = {@code 
PubSubSinkV2Builder}.builder()
+ * .setProjectId("project-id")
+ * .setTopicId("topic-id)
+ * .setGcpPublisherConfig(gcpPublisherConfig)
+ * .setSerializationSchema(new SimpleStringSchema())
+ * .setMaximumInflightMessages(10)
+ * .setFailOnError(true)
+ * .build();
+ *
+ * }
+ *
+ * @param 
+ */
+@PublicEvolving
+public class PubSubSinkV2Builder {
+private String projectId;
+private String topicId;
+private SerializationSchema serializationSchema;
+private GcpPublisherConfig gcpPublisherConfig;
+private Integer numMaxInflightRequests;
+private Boolean failOnError;

Review Comment:
   why are they not primitives?
   I see that these values are changed with setters where args are primitives
   or did I miss something?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


snuyanzin commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601857522


##
.github/workflows/weekly.yml:
##
@@ -37,10 +37,6 @@ jobs:
   flink: 1.19-SNAPSHOT,
   jdk: '8, 11, 17, 21',
   branch: main
-}, {
-  flink: 1.18-SNAPSHOT,
-  jdk: '8, 11, 17',
-  branch: main

Review Comment:
   why do we remove 1.18 here and do not do that for pr flow?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


vahmed-hamdy commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601406910


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java:
##
@@ -62,6 +62,7 @@
  *
  * @param  type of PubSubSink messages to write
  */
+@Deprecated
 public class PubSubSink extends RichSinkFunction implements 
CheckpointedFunction {

Review Comment:
   We can remove this when we release a new major version `4.0` once this is 
merged we cann follow up with another Jira to remove



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


vahmed-hamdy commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601405551


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2Builder.java:
##
@@ -0,0 +1,93 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR;
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS;
+
+/**
+ * A builder for creating a {@link PubSubSinkV2}.
+ *
+ * The builder uses the following parameters to build a {@link 
PubSubSinkV2}:
+ *
+ * 
+ *   {@link GcpPublisherConfig} for the {@link 
com.google.cloud.pubsub.v1.Publisher}
+ *   configuration.
+ *   {@link SerializationSchema} for serializing the input data.
+ *   {@code projectId} for the name of the project where the topic is 
located.
+ *   {@code topicId} for the name of the topic to send messages to.
+ *   {@code maximumInflightMessages} for the maximum number of in-flight 
messages.
+ *   {@code failOnError} for whether to fail on an error.
+ * 
+ *
+ * It can be used as follows:
+ *
+ * {@code
+ * PubSubSinkV2Builder pubSubSink = {@link 
PubSubSinkV2Builder}.builder()

Review Comment:
   Good catch!



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


morazow commented on PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#issuecomment-2112125712

   Thanks @vahmed-hamdy, great work!
   
   Added some minor suggestions. And a point for number of retries on failure


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org



Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-15 Thread via GitHub


morazow commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601341788


##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2Builder.java:
##
@@ -0,0 +1,93 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+
+import java.util.Optional;
+
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR;
+import static 
org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS;
+
+/**
+ * A builder for creating a {@link PubSubSinkV2}.
+ *
+ * The builder uses the following parameters to build a {@link 
PubSubSinkV2}:
+ *
+ * 
+ *   {@link GcpPublisherConfig} for the {@link 
com.google.cloud.pubsub.v1.Publisher}
+ *   configuration.
+ *   {@link SerializationSchema} for serializing the input data.
+ *   {@code projectId} for the name of the project where the topic is 
located.
+ *   {@code topicId} for the name of the topic to send messages to.
+ *   {@code maximumInflightMessages} for the maximum number of in-flight 
messages.
+ *   {@code failOnError} for whether to fail on an error.
+ * 
+ *
+ * It can be used as follows:
+ *
+ * {@code
+ * PubSubSinkV2Builder pubSubSink = {@link 
PubSubSinkV2Builder}.builder()

Review Comment:
   Should the `{@link ` removed here? Since it is already in code block maybe 
it doesn't render



##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2.java:
##
@@ -0,0 +1,96 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.Sink;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * A RabbitMQ {@link Sink} to produce data into Gcp PubSub. The sink uses the 
{@link

Review Comment:
   ```suggestion
* A {@link Sink} to produce data into Gcp PubSub. The sink uses the {@link
   ```



##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java:
##
@@ -62,6 +62,7 @@
  *
  * @param  type of PubSubSink messages to write
  */
+@Deprecated
 public class PubSubSink extends RichSinkFunction implements 
CheckpointedFunction {

Review Comment:
    



##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java:
##
@@ -0,0 +1,195 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.TopicName;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.concurrent.Executors.directExecutor;
+
+/**
+ * A stateless {@link SinkWriter} that writes records to Pub/Sub using generic 
{@link
+ * GcpWriterClient}. The writer blocks on completion of inflight requests on 
{@code flush()} and
+ * {@code close()}. The writer also uses {@code maxInFlightRequests} and 
blocks new writes if the
+ * number of inflight requests.

Review Comment:
   Sentence is not finished? `.. exceeds that number`



##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/streaming/connectors/gcp/pubsub/PubSubSink.java:
##
@@ -62,6 +62,7 @@
  *
  * @param  type of PubSubSink messages to write
  */
+@Deprecated
 public class PubSubSink extends RichSinkFunction implements 
CheckpointedFunction {

Review Comment:
   When is good to remove these API?



##
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java:
##
@@ -0,0 +1,195 @@
+package 

Re: [PR] [FLINK-24298] Add GCP PubSub Sink API Implementation, bump Flink version to 1.19.0 [flink-connector-gcp-pubsub]

2024-05-14 Thread via GitHub


vahmed-hamdy commented on PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#issuecomment-2111082697

   @snuyanzin  would you mind taking a look, We want to complete this migration 
since the `SinkFuction` is deprecated.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org