This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.9 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.9 by this push: new 22571aa [FLINK-13059][cassandra] Release semaphore on exception in send() 22571aa is described below commit 22571aab57fc30450de8a850f1a8a6ea80fdba2c Author: Mads Chr. Olesen <m...@trackunit.com> AuthorDate: Tue Sep 3 11:49:10 2019 +0200 [FLINK-13059][cassandra] Release semaphore on exception in send() --- .../connectors/cassandra/CassandraSinkBase.java | 21 +++++++---- .../cassandra/CassandraSinkBaseTest.java | 42 ++++++++++++++++++++-- 2 files changed, 54 insertions(+), 9 deletions(-) diff --git a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java index ede5586..0e7eb6f 100644 --- a/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java +++ b/flink-connectors/flink-connector-cassandra/src/main/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBase.java @@ -128,8 +128,14 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> impl @Override public void invoke(IN value) throws Exception { checkAsyncErrors(); - tryAcquire(); - final ListenableFuture<V> result = send(value); + tryAcquire(1); + final ListenableFuture<V> result; + try { + result = send(value); + } catch (Exception e) { + semaphore.release(); + throw e; + } Futures.addCallback(result, callback); } @@ -139,11 +145,12 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> impl public abstract ListenableFuture<V> send(IN value); - private void tryAcquire() throws InterruptedException, TimeoutException { - if (!semaphore.tryAcquire(config.getMaxConcurrentRequestsTimeout().toMillis(), TimeUnit.MILLISECONDS)) { + private void tryAcquire(int permits) throws InterruptedException, TimeoutException { + if (!semaphore.tryAcquire(permits, config.getMaxConcurrentRequestsTimeout().toMillis(), TimeUnit.MILLISECONDS)) { throw new TimeoutException( String.format( - "Failed to acquire 1 permit of %d to send value in %s.", + "Failed to acquire %d out of %d permits to send value in %s.", + permits, config.getMaxConcurrentRequests(), config.getMaxConcurrentRequestsTimeout() ) @@ -158,8 +165,8 @@ public abstract class CassandraSinkBase<IN, V> extends RichSinkFunction<IN> impl } } - private void flush() { - semaphore.acquireUninterruptibly(config.getMaxConcurrentRequests()); + private void flush() throws InterruptedException, TimeoutException { + tryAcquire(config.getMaxConcurrentRequests()); semaphore.release(config.getMaxConcurrentRequests()); } diff --git a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java index 2b705a5..b4406ab 100644 --- a/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java +++ b/flink-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraSinkBaseTest.java @@ -28,6 +28,7 @@ import org.apache.flink.util.Preconditions; import com.datastax.driver.core.Cluster; import com.datastax.driver.core.ResultSet; import com.datastax.driver.core.Session; +import com.datastax.driver.core.exceptions.InvalidQueryException; import com.datastax.driver.core.exceptions.NoHostAvailableException; import com.google.common.util.concurrent.ListenableFuture; import org.junit.Assert; @@ -180,7 +181,7 @@ public class CassandraSinkBaseTest { } }; t.start(); - while (t.getState() != Thread.State.WAITING) { + while (t.getState() != Thread.State.TIMED_WAITING) { Thread.sleep(5); } @@ -212,7 +213,7 @@ public class CassandraSinkBaseTest { } }; t.start(); - while (t.getState() != Thread.State.WAITING) { + while (t.getState() != Thread.State.TIMED_WAITING) { Thread.sleep(5); } @@ -273,6 +274,26 @@ public class CassandraSinkBaseTest { } @Test(timeout = DEFAULT_TEST_TIMEOUT) + public void testReleaseOnSendException() throws Exception { + final CassandraSinkBaseConfig config = CassandraSinkBaseConfig.newBuilder() + .setMaxConcurrentRequests(1) + .build(); + + try (TestCassandraSink testCassandraSink = createOpenedSendExceptionTestCassandraSink(config)) { + Assert.assertEquals(1, testCassandraSink.getAvailablePermits()); + Assert.assertEquals(0, testCassandraSink.getAcquiredPermits()); + + try { + testCassandraSink.invoke("N/A"); + } catch (Exception e) { + Assert.assertTrue(e instanceof InvalidQueryException); + Assert.assertEquals(1, testCassandraSink.getAvailablePermits()); + Assert.assertEquals(0, testCassandraSink.getAcquiredPermits()); + } + } + } + + @Test(timeout = DEFAULT_TEST_TIMEOUT) public void testTimeoutExceptionOnInvoke() throws Exception { final CassandraSinkBaseConfig config = CassandraSinkBaseConfig.newBuilder() .setMaxConcurrentRequests(1) @@ -331,6 +352,12 @@ public class CassandraSinkBaseTest { return testHarness; } + private TestCassandraSink createOpenedSendExceptionTestCassandraSink(CassandraSinkBaseConfig config) { + final TestCassandraSink testCassandraSink = new SendExceptionTestCassandraSink(config); + testCassandraSink.open(new Configuration()); + return testCassandraSink; + } + private static class TestCassandraSink extends CassandraSinkBase<String, ResultSet> implements AutoCloseable { private static final ClusterBuilder builder; @@ -379,4 +406,15 @@ public class CassandraSinkBaseTest { resultSetFutures.offer(ResultSetFutures.fromCompletableFuture(completableFuture)); } } + + private static class SendExceptionTestCassandraSink extends TestCassandraSink { + SendExceptionTestCassandraSink(CassandraSinkBaseConfig config) { + super(config, new NoOpCassandraFailureHandler()); + } + + @Override + public ListenableFuture<ResultSet> send(String value) { + throw new InvalidQueryException("For test purposes"); + } + } }