This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.10 by this push: new c4e7baa [FLINK-15301] [kinesis] Exception propagation from record emitter to source thread c4e7baa is described below commit c4e7baaedab0be13c08b9fc0b3e6bef9d8668791 Author: Thomas Weise <t...@apache.org> AuthorDate: Tue Dec 17 14:56:43 2019 -0800 [FLINK-15301] [kinesis] Exception propagation from record emitter to source thread --- .../kinesis/internals/KinesisDataFetcher.java | 14 +++++++++++- .../connectors/kinesis/util/RecordEmitter.java | 13 ++++++----- .../kinesis/FlinkKinesisConsumerTest.java | 26 +++++++++++++++++++++- 3 files changed, 46 insertions(+), 7 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java index afd6138..52d6293 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java @@ -497,7 +497,19 @@ public class KinesisDataFetcher<T> { Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_IDLE_INTERVAL_MILLIS))); // run record emitter in separate thread since main thread is used for discovery - Thread thread = new Thread(this.recordEmitter); + Runnable recordEmitterRunnable = new Runnable() { + @Override + public void run() { + try { + recordEmitter.run(); + } catch (Throwable error) { + // report the error that terminated the emitter loop to source thread + stopWithError(error); + } + } + }; + + Thread thread = new Thread(recordEmitterRunnable); thread.setName("recordEmitter-" + runtimeContext.getTaskNameWithSubtasks()); thread.setDaemon(true); thread.start(); diff --git a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java index 95c3688..dfa3388 100644 --- a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java +++ b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java @@ -170,7 +170,14 @@ public abstract class RecordEmitter<T extends TimestampedValue> implements Runna @Override public void run() { LOG.info("Starting emitter with maxLookaheadMillis: {}", this.maxLookaheadMillis); + emitRecords(); + } + + public void stop() { + running = false; + } + protected void emitRecords() { // emit available records, ordered by timestamp AsyncRecordQueue<T> min = heads.poll(); runLoop: @@ -248,12 +255,8 @@ public abstract class RecordEmitter<T extends TimestampedValue> implements Runna } } - public void stop() { - running = false; - } - /** Emit the record. This is specific to a connector, like the Kinesis data fetcher. */ - public abstract void emit(T record, RecordQueue<T> source); + protected abstract void emit(T record, RecordQueue<T> source); /** Summary of emit queues that can be used for logging. */ public String printInfo() { diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java index 1a910cb..d2d637c 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java @@ -75,6 +75,7 @@ import org.powermock.api.mockito.PowerMockito; import org.powermock.core.classloader.annotations.PrepareForTest; import org.powermock.modules.junit4.PowerMockRunner; +import java.io.Serializable; import java.time.Duration; import java.util.ArrayList; import java.util.Collections; @@ -86,6 +87,7 @@ import java.util.UUID; import java.util.concurrent.BlockingQueue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; @@ -920,6 +922,7 @@ public class FlinkKinesisConsumerTest extends TestLogger { testHarness.open(); final ConcurrentLinkedQueue<Object> results = testHarness.getOutput(); + final AtomicBoolean throwOnCollect = new AtomicBoolean(); @SuppressWarnings("unchecked") SourceFunction.SourceContext<String> sourceContext = new CollectingSourceContext( @@ -929,11 +932,20 @@ public class FlinkKinesisConsumerTest extends TestLogger { } @Override + public void collect(Serializable element) { + if (throwOnCollect.get()) { + throw new RuntimeException("expected"); + } + super.collect(element); + } + + @Override public void emitWatermark(Watermark mark) { results.add(mark); } }; + final AtomicReference<Exception> sourceThreadError = new AtomicReference<>(); new Thread( () -> { try { @@ -941,7 +953,7 @@ public class FlinkKinesisConsumerTest extends TestLogger { } catch (InterruptedException e) { // expected on cancel } catch (Exception e) { - throw new RuntimeException(e); + sourceThreadError.set(e); } }) .start(); @@ -999,6 +1011,18 @@ public class FlinkKinesisConsumerTest extends TestLogger { expectedResults.add(new Watermark(3000)); assertThat(results, org.hamcrest.Matchers.contains(expectedResults.toArray())); + // verify exception propagation + throwOnCollect.set(true); + shard1.put(Long.toString(record2 + 1)); + + Assert.assertNull(sourceThreadError.get()); + deadline = Deadline.fromNow(Duration.ofSeconds(10)); + while (deadline.hasTimeLeft() && sourceThreadError.get() == null) { + Thread.sleep(10); + } + Assert.assertNotNull(sourceThreadError.get()); + Assert.assertNotNull("expected", sourceThreadError.get().getMessage()); + sourceFunc.cancel(); testHarness.close(); }