This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.9
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.9 by this push:
     new 22571aa  [FLINK-13059][cassandra] Release semaphore on exception in 
send()
22571aa is described below

commit 22571aab57fc30450de8a850f1a8a6ea80fdba2c
Author: Mads Chr. Olesen <m...@trackunit.com>
AuthorDate: Tue Sep 3 11:49:10 2019 +0200

    [FLINK-13059][cassandra] Release semaphore on exception in send()
---
 .../connectors/cassandra/CassandraSinkBase.java    | 21 +++++++----
 .../cassandra/CassandraSinkBaseTest.java           | 42 ++++++++++++++++++++--
 2 files changed, 54 insertions(+), 9 deletions(-)

diff --git 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
index ede5586..0e7eb6f 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java
@@ -128,8 +128,14 @@ public abstract class CassandraSinkBase<IN, V> extends 
RichSinkFunction<IN> impl
        @Override
        public void invoke(IN value) throws Exception {
                checkAsyncErrors();
-               tryAcquire();
-               final ListenableFuture<V> result = send(value);
+               tryAcquire(1);
+               final ListenableFuture<V> result;
+               try {
+                       result = send(value);
+               } catch (Exception e) {
+                       semaphore.release();
+                       throw e;
+               }
                Futures.addCallback(result, callback);
        }
 
@@ -139,11 +145,12 @@ public abstract class CassandraSinkBase<IN, V> extends 
RichSinkFunction<IN> impl
 
        public abstract ListenableFuture<V> send(IN value);
 
-       private void tryAcquire() throws InterruptedException, TimeoutException 
{
-               if 
(!semaphore.tryAcquire(config.getMaxConcurrentRequestsTimeout().toMillis(), 
TimeUnit.MILLISECONDS)) {
+       private void tryAcquire(int permits) throws InterruptedException, 
TimeoutException {
+               if (!semaphore.tryAcquire(permits, 
config.getMaxConcurrentRequestsTimeout().toMillis(), TimeUnit.MILLISECONDS)) {
                        throw new TimeoutException(
                                String.format(
-                                       "Failed to acquire 1 permit of %d to 
send value in %s.",
+                                       "Failed to acquire %d out of %d permits 
to send value in %s.",
+                                       permits,
                                        config.getMaxConcurrentRequests(),
                                        config.getMaxConcurrentRequestsTimeout()
                                )
@@ -158,8 +165,8 @@ public abstract class CassandraSinkBase<IN, V> extends 
RichSinkFunction<IN> impl
                }
        }
 
-       private void flush() {
-               
semaphore.acquireUninterruptibly(config.getMaxConcurrentRequests());
+       private void flush() throws InterruptedException, TimeoutException {
+               tryAcquire(config.getMaxConcurrentRequests());
                semaphore.release(config.getMaxConcurrentRequests());
        }
 
diff --git 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
index 2b705a5..b4406ab 100644
--- 
a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
+++ 
b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java
@@ -28,6 +28,7 @@ import org.apache.flink.util.Preconditions;
 import com.datastax.driver.core.Cluster;
 import com.datastax.driver.core.ResultSet;
 import com.datastax.driver.core.Session;
+import com.datastax.driver.core.exceptions.InvalidQueryException;
 import com.datastax.driver.core.exceptions.NoHostAvailableException;
 import com.google.common.util.concurrent.ListenableFuture;
 import org.junit.Assert;
@@ -180,7 +181,7 @@ public class CassandraSinkBaseTest {
                                }
                        };
                        t.start();
-                       while (t.getState() != Thread.State.WAITING) {
+                       while (t.getState() != Thread.State.TIMED_WAITING) {
                                Thread.sleep(5);
                        }
 
@@ -212,7 +213,7 @@ public class CassandraSinkBaseTest {
                                }
                        };
                        t.start();
-                       while (t.getState() != Thread.State.WAITING) {
+                       while (t.getState() != Thread.State.TIMED_WAITING) {
                                Thread.sleep(5);
                        }
 
@@ -273,6 +274,26 @@ public class CassandraSinkBaseTest {
        }
 
        @Test(timeout = DEFAULT_TEST_TIMEOUT)
+       public void testReleaseOnSendException() throws Exception {
+               final CassandraSinkBaseConfig config = 
CassandraSinkBaseConfig.newBuilder()
+                       .setMaxConcurrentRequests(1)
+                       .build();
+
+               try (TestCassandraSink testCassandraSink = 
createOpenedSendExceptionTestCassandraSink(config)) {
+                       Assert.assertEquals(1, 
testCassandraSink.getAvailablePermits());
+                       Assert.assertEquals(0, 
testCassandraSink.getAcquiredPermits());
+
+                       try {
+                               testCassandraSink.invoke("N/A");
+                       } catch (Exception e) {
+                               Assert.assertTrue(e instanceof 
InvalidQueryException);
+                               Assert.assertEquals(1, 
testCassandraSink.getAvailablePermits());
+                               Assert.assertEquals(0, 
testCassandraSink.getAcquiredPermits());
+                       }
+               }
+       }
+
+       @Test(timeout = DEFAULT_TEST_TIMEOUT)
        public void testTimeoutExceptionOnInvoke() throws Exception {
                final CassandraSinkBaseConfig config = 
CassandraSinkBaseConfig.newBuilder()
                        .setMaxConcurrentRequests(1)
@@ -331,6 +352,12 @@ public class CassandraSinkBaseTest {
                return testHarness;
        }
 
+       private TestCassandraSink 
createOpenedSendExceptionTestCassandraSink(CassandraSinkBaseConfig config) {
+               final TestCassandraSink testCassandraSink = new 
SendExceptionTestCassandraSink(config);
+               testCassandraSink.open(new Configuration());
+               return testCassandraSink;
+       }
+
        private static class TestCassandraSink extends 
CassandraSinkBase<String, ResultSet> implements AutoCloseable {
 
                private static final ClusterBuilder builder;
@@ -379,4 +406,15 @@ public class CassandraSinkBaseTest {
                        
resultSetFutures.offer(ResultSetFutures.fromCompletableFuture(completableFuture));
                }
        }
+
+       private static class SendExceptionTestCassandraSink extends 
TestCassandraSink {
+               SendExceptionTestCassandraSink(CassandraSinkBaseConfig config) {
+                       super(config, new NoOpCassandraFailureHandler());
+               }
+
+               @Override
+               public ListenableFuture<ResultSet> send(String value) {
+                       throw new InvalidQueryException("For test purposes");
+               }
+       }
 }

Reply via email to