snuyanzin commented on code in PR #27: URL: https://github.com/apache/flink-connector-gcp-pubsub/pull/27#discussion_r1601908776
########## flink-connector-gcp-pubsub/src/main/java/org/apache/flink/connector/gcp/pubsub/sink/PubSubSinkV2Builder.java: ########## @@ -0,0 +1,93 @@ +package org.apache.flink.connector.gcp.pubsub.sink; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.serialization.SerializationSchema; +import org.apache.flink.connector.gcp.pubsub.sink.config.GcpPublisherConfig; + +import java.util.Optional; + +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_FAIL_ON_ERROR; +import static org.apache.flink.connector.gcp.pubsub.common.PubSubConstants.DEFAULT_MAXIMUM_INFLIGHT_REQUESTS; + +/** + * A builder for creating a {@link PubSubSinkV2}. + * + * <p>The builder uses the following parameters to build a {@link PubSubSinkV2}: + * + * <ul> + * <li>{@link GcpPublisherConfig} for the {@link com.google.cloud.pubsub.v1.Publisher} + * configuration. + * <li>{@link SerializationSchema} for serializing the input data. + * <li>{@code projectId} for the name of the project where the topic is located. + * <li>{@code topicId} for the name of the topic to send messages to. + * <li>{@code maximumInflightMessages} for the maximum number of in-flight messages. + * <li>{@code failOnError} for whether to fail on an error. + * </ul> + * + * <p>It can be used as follows: + * + * <pre>{@code + * PubSubSinkV2Builder<String> pubSubSink = {@code PubSubSinkV2Builder}.<String>builder() + * .setProjectId("project-id") + * .setTopicId("topic-id) + * .setGcpPublisherConfig(gcpPublisherConfig) + * .setSerializationSchema(new SimpleStringSchema()) + * .setMaximumInflightMessages(10) + * .setFailOnError(true) + * .build(); + * + * }</pre> + * + * @param <T> + */ +@PublicEvolving +public class PubSubSinkV2Builder<T> { + private String projectId; + private String topicId; + private SerializationSchema<T> serializationSchema; + private GcpPublisherConfig gcpPublisherConfig; + private Integer numMaxInflightRequests; + private Boolean failOnError; + + public PubSubSinkV2Builder<T> setProjectId(String projectId) { + this.projectId = projectId; + return this; + } + + public PubSubSinkV2Builder<T> setTopicId(String topicId) { + this.topicId = topicId; + return this; + } + + public PubSubSinkV2Builder<T> setSerializationSchema( + SerializationSchema<T> serializationSchema) { + this.serializationSchema = serializationSchema; + return this; + } + + public PubSubSinkV2Builder<T> setGcpPublisherConfig(GcpPublisherConfig gcpPublisherConfig) { + this.gcpPublisherConfig = gcpPublisherConfig; + return this; + } + + public PubSubSinkV2Builder<T> setNumMaxInflightRequests(int numMaxInflightRequests) { + this.numMaxInflightRequests = numMaxInflightRequests; Review Comment: what happens if we pass a negative number? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org