[ 
https://issues.apache.org/jira/browse/FLINK-9083?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16668081#comment-16668081
 ] 

ASF GitHub Bot commented on FLINK-9083:
---------------------------------------

jparkie commented on a change in pull request #6782: [FLINK-9083][Cassandra 
Connector] Add async backpressure support to Cassandra Connector
URL: https://github.com/apache/flink/pull/6782#discussion_r229168212
 
 

 ##########
 File path: 
flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 ##########
 @@ -43,70 +48,82 @@
  */
 public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> 
implements CheckpointedFunction {
        protected final Logger log = LoggerFactory.getLogger(getClass());
-       protected transient Cluster cluster;
-       protected transient Session session;
 
-       protected transient volatile Throwable exception;
-       protected transient FutureCallback<V> callback;
+       // ------------------------ Default Configurations 
------------------------
+
+       /**
+        * The default maximum number of concurrent requests. By default, 
{@code Integer.MAX_VALUE}.
+        */
+       public static final int DEFAULT_MAX_CONCURRENT_REQUESTS = 
Integer.MAX_VALUE;
+
+       /**
+        * The default timeout duration when acquiring a permit to execute. By 
default, {@code Long.MAX_VALUE}.
+        */
+       public static final long DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT = 
Long.MAX_VALUE;
+
+       /**
+        * The default timeout unit when acquiring a permit to execute. By 
default, milliseconds.
+        */
+       public static final TimeUnit 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT = TimeUnit.MILLISECONDS;
+
+       // ------------------------- Configuration Fields 
-------------------------
+
+       private int maxConcurrentRequests = DEFAULT_MAX_CONCURRENT_REQUESTS;
+       private long maxConcurrentRequestsTimeout = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT;
+       private TimeUnit maxConcurrentRequestsTimeoutUnit = 
DEFAULT_MAX_CONCURRENT_REQUESTS_TIMEOUT_UNIT;
+
+       // --------------------------- Cassandra Fields 
---------------------------
 
        private final ClusterBuilder builder;
 
-       private final AtomicInteger updatesPending = new AtomicInteger();
+       protected transient Cluster cluster;
+       protected transient Session session;
+
+       // ------------------------ Synchronization Fields 
------------------------
+
+       private AtomicReference<Throwable> throwable;
+       private Semaphore semaphore;
+       private Phaser phaser;
 
        CassandraSinkBase(ClusterBuilder builder) {
                this.builder = builder;
                ClosureCleaner.clean(builder, true);
        }
 
-       @Override
-       public void open(Configuration configuration) {
-               this.callback = new FutureCallback<V>() {
-                       @Override
-                       public void onSuccess(V ignored) {
-                               int pending = updatesPending.decrementAndGet();
-                               if (pending == 0) {
-                                       synchronized (updatesPending) {
-                                               updatesPending.notifyAll();
-                                       }
-                               }
-                       }
+       // ----------------------------- Sink Methods 
-----------------------------
 
+       @Override
+       public void open(Configuration parameters) {
+               cluster = createCluster();
+               session = createSession();
+
+               throwable = new AtomicReference<>();
+               semaphore = new Semaphore(maxConcurrentRequests);
+               /*
+                * A Phaser is a flexible and reusable synchronization barrier 
similar to CyclicBarrier and CountDownLatch.
+                *
+                * This Phaser is configured to support "1 + N" parties.
+                *   - "1" for the CassandraSinkBase to arrive and to await at 
the Phaser during a flush() call.
+                *   - "N" for the varying number of invoke() calls that 
register and de-register with the Phaser.
+                *
+                * The Phaser awaits the completion of the advancement of a 
phase prior to returning from a register() call.
+                * This behavior ensures that no backlogged invoke() calls 
register to execute while the Semaphore's permits
+                * are being released during a flush() call.
+                */
+               phaser = new Phaser(1) {
 
 Review comment:
   That's true; a concurrent set of `Future`s would be simpler. I've seen 
implementations of similar code in other OSS that rely on a concurrent 
`TrieMap` to track all the `pendingFutures` before a snapshot. At the snapshot, 
it iterates the `pendingFutures` calling `get()` to block which either returns 
or throws an exception. However, the `Future`s are created in large batches 
with Spark, so this implementation may have caveats in the streaming nature of 
Flink.
   
   In Java, if we use a set backed by a `ConcurrentHashMap`, since we rely on a 
semaphore to throttle, we have deterministic behaviour regarding memory 
consumption, and since we can set the `concurrencyLevel` of a 
`ConcurrentHashMap` based on the underlying driver's parallelism, we should 
also have minimal contention on `put()`. The only feature we would miss given 
the streaming and back-pressured nature of Flink, is the Phaser preventing 
invoke() calls backed up on the semaphore from delaying the `flush()`. By 
introducing a flush lock with the `ConcurrentHashMap` and enough testing, it 
should be feasible to replace the `Phaser`. What do you think?
   
   To your last comment, I originally relied on `Phaser` because the code 
originally used `wait` and `notify`, and I really like what Effective Java 
recommends: "Item 69: Prefer concurrency utilities to wait and notify". I was 
thinking of moving this logic into an abstract component, since Elasticsearch, 
Kafka, and so many other Sinks replicate a variant of the `wait` and `notify` 
logic, but I thought it was beyond scope. I was going to finish this ticket and 
have a follow-up suggesting we generalize this logic for all other sinks.
   

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add async backpressure support to Cassandra Connector
> -----------------------------------------------------
>
>                 Key: FLINK-9083
>                 URL: https://issues.apache.org/jira/browse/FLINK-9083
>             Project: Flink
>          Issue Type: Improvement
>          Components: Cassandra Connector
>            Reporter: Jacob Park
>            Assignee: Jacob Park
>            Priority: Minor
>              Labels: pull-request-available
>
> As the CassandraSinkBase derivatives utilize async writes, they do not block 
> the task to introduce any backpressure.
> I am currently using a semaphore to provide backpressure support by blocking 
> at a maximum concurrent requests limit like how DataStax's Spark Cassandra 
> Connector functions: 
> [https://github.com/datastax/spark-cassandra-connector/blob/v2.0.7/spark-cassandra-connector/src/main/scala/com/datastax/spark/connector/writer/AsyncExecutor.scala#L18]
> This improvement has greatly improved the fault-tolerance of our Cassandra 
> Sink Connector implementation on Apache Flink in production. I would like to 
> contribute this feature back upstream.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to