vahmed-hamdy commented on code in PR #27:
URL: 
https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1602973242


##########
flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java:
##########
@@ -0,0 +1,195 @@
+package org.apache.flink.connector.gcp.pubsub.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.api.connector.sink2.SinkWriter;
+import org.apache.flink.api.connector.sink2.WriterInitContext;
+import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.util.FlinkRuntimeException;
+import org.apache.flink.util.Preconditions;
+
+import com.google.api.core.ApiFuture;
+import com.google.api.core.ApiFutureCallback;
+import com.google.api.core.ApiFutures;
+import com.google.cloud.pubsub.v1.Publisher;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.PubsubMessage;
+import com.google.pubsub.v1.TopicName;
+
+import java.io.IOException;
+import java.util.Optional;
+
+import static org.apache.flink.util.concurrent.Executors.directExecutor;
+
+/**
+ * A stateless {@link SinkWriter} that writes records to Pub/Sub using generic 
{@link
+ * GcpWriterClient}. The writer blocks on completion of inflight requests on 
{@code flush()} and
+ * {@code close()}. The writer also uses {@code maxInFlightRequests} and 
blocks new writes if the
+ *  number of inflight requests exceeds the specified limit.
+ *
+ * @param <T> The type of the records .
+ */
+@Internal
+public class PubSubWriter<T> implements SinkWriter<T> {
+
+    /** The PubSub generic client to publish messages. */
+    private final GcpWriterClient<PubsubMessage> publisher;
+
+    /**
+     * The maximum number of inflight requests, The writer blocks new writes 
if the number of
+     * inflight requests exceeds the specified limit.
+     */
+    private final long maximumInflightRequests;

Review Comment:
   Agreed, will use `int` for both 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to