vahmed-hamdy commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1602973242
########## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubWriter.java: ########## @@ -0,0 +1,195 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.api.connector.sink2.SinkWriter; +import org.apache.flink.api.connector.sink2.WriterInitContext; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; +import org.apache.flink.metrics.Counter; +import org.apache.flink.util.FlinkRuntimeException; +import org.apache.flink.util.Preconditions; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.TopicName; + +import java.io.IOException; +import java.util.Optional; + +import static org.apache.flink.util.concurrent.Executors.directExecutor; + +/** + * A stateless {@link SinkWriter} that writes records to Pub/Sub using generic {@link + * GcpWriterClient}. The writer blocks on completion of inflight requests on {@code flush()} and + * {@code close()}. The writer also uses {@code maxInFlightRequests} and blocks new writes if the + * number of inflight requests exceeds the specified limit. + * + * @param <T> The type of the records . + */ +@Internal +public class PubSubWriter<T> implements SinkWriter<T> { + + /** The PubSub generic client to publish messages. */ + private final GcpWriterClient<PubsubMessage> publisher; + + /** + * The maximum number of inflight requests, The writer blocks new writes if the number of + * inflight requests exceeds the specified limit. + */ + private final long maximumInflightRequests; Review Comment: Agreed, will use `int` for both -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org