zentol commented on code in PR #19680: URL: https://github.com/apache/flink/pull/19680#discussion_r873552991
########## flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/batch/connectors/cassandra/CassandraOutputFormatBase.java: ########## @@ -17,130 +17,191 @@ package org.apache.flink.batch.connectors.cassandra; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.io.RichOutputFormat; import org.apache.flink.configuration.Configuration; +import org.apache.flink.connectors.cassandra.utils.SinkUtils; import org.apache.flink.streaming.connectors.cassandra.ClusterBuilder; import org.apache.flink.util.Preconditions; import com.datastax.driver.core.Cluster; -import com.datastax.driver.core.PreparedStatement; -import com.datastax.driver.core.ResultSet; -import com.datastax.driver.core.ResultSetFuture; import com.datastax.driver.core.Session; -import com.google.common.base.Strings; import com.google.common.util.concurrent.FutureCallback; import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; +import java.time.Duration; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; /** - * CassandraOutputFormatBase is the common abstract class for writing into Apache Cassandra. + * CassandraOutputFormatBase is the common abstract class for writing into Apache Cassandra using + * output formats. * * @param <OUT> Type of the elements to write. */ -public abstract class CassandraOutputFormatBase<OUT> extends RichOutputFormat<OUT> { +public abstract class CassandraOutputFormatBase<OUT, V> extends RichOutputFormat<OUT> { private static final Logger LOG = LoggerFactory.getLogger(CassandraOutputFormatBase.class); - private final String insertQuery; private final ClusterBuilder builder; + private Semaphore semaphore; + private Duration maxConcurrentRequestsTimeout = Duration.ofMillis(Long.MAX_VALUE); + private int maxConcurrentRequests = Integer.MAX_VALUE; private transient Cluster cluster; - private transient Session session; - private transient PreparedStatement prepared; - private transient FutureCallback<ResultSet> callback; - private transient Throwable exception = null; + protected transient Session session; + private transient FutureCallback<V> callback; + private AtomicReference<Throwable> throwable; - public CassandraOutputFormatBase(String insertQuery, ClusterBuilder builder) { - Preconditions.checkArgument( - !Strings.isNullOrEmpty(insertQuery), "Query cannot be null or empty"); + public CassandraOutputFormatBase(ClusterBuilder builder) { Preconditions.checkNotNull(builder, "Builder cannot be null"); - - this.insertQuery = insertQuery; this.builder = builder; } + /** + * Sets the maximum allowed number of concurrent requests for this output format. + * + * @param maxConcurrentRequestsTimeout timeout duration when acquiring a permit to execute + */ + public void setMaxConcurrentRequestsTimeout(Duration maxConcurrentRequestsTimeout) { Review Comment: these shouldn't be setters but a constructor argument. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org