This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
     new c4e7baa  [FLINK-15301] [kinesis] Exception propagation from record 
emitter to source thread
c4e7baa is described below

commit c4e7baaedab0be13c08b9fc0b3e6bef9d8668791
Author: Thomas Weise <t...@apache.org>
AuthorDate: Tue Dec 17 14:56:43 2019 -0800

    [FLINK-15301] [kinesis] Exception propagation from record emitter to source 
thread
---
 .../kinesis/internals/KinesisDataFetcher.java      | 14 +++++++++++-
 .../connectors/kinesis/util/RecordEmitter.java     | 13 ++++++-----
 .../kinesis/FlinkKinesisConsumerTest.java          | 26 +++++++++++++++++++++-
 3 files changed, 46 insertions(+), 7 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
index afd6138..52d6293 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/KinesisDataFetcher.java
@@ -497,7 +497,19 @@ public class KinesisDataFetcher<T> {
                                        
Long.toString(ConsumerConfigConstants.DEFAULT_SHARD_IDLE_INTERVAL_MILLIS)));
 
                        // run record emitter in separate thread since main 
thread is used for discovery
-                       Thread thread = new Thread(this.recordEmitter);
+                       Runnable recordEmitterRunnable = new Runnable() {
+                               @Override
+                               public void run() {
+                                       try {
+                                               recordEmitter.run();
+                                       } catch (Throwable error) {
+                                               // report the error that 
terminated the emitter loop to source thread
+                                               stopWithError(error);
+                                       }
+                               }
+                       };
+
+                       Thread thread = new Thread(recordEmitterRunnable);
                        thread.setName("recordEmitter-" + 
runtimeContext.getTaskNameWithSubtasks());
                        thread.setDaemon(true);
                        thread.start();
diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
index 95c3688..dfa3388 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
@@ -170,7 +170,14 @@ public abstract class RecordEmitter<T extends 
TimestampedValue> implements Runna
        @Override
        public void run() {
                LOG.info("Starting emitter with maxLookaheadMillis: {}", 
this.maxLookaheadMillis);
+               emitRecords();
+       }
+
+       public void stop() {
+               running = false;
+       }
 
+       protected void emitRecords() {
                // emit available records, ordered by timestamp
                AsyncRecordQueue<T> min = heads.poll();
                runLoop:
@@ -248,12 +255,8 @@ public abstract class RecordEmitter<T extends 
TimestampedValue> implements Runna
                }
        }
 
-       public void stop() {
-               running = false;
-       }
-
        /** Emit the record. This is specific to a connector, like the Kinesis 
data fetcher. */
-       public abstract void emit(T record, RecordQueue<T> source);
+       protected abstract void emit(T record, RecordQueue<T> source);
 
        /** Summary of emit queues that can be used for logging. */
        public String printInfo() {
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
index 1a910cb..d2d637c 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
@@ -75,6 +75,7 @@ import org.powermock.api.mockito.PowerMockito;
 import org.powermock.core.classloader.annotations.PrepareForTest;
 import org.powermock.modules.junit4.PowerMockRunner;
 
+import java.io.Serializable;
 import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -86,6 +87,7 @@ import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Supplier;
@@ -920,6 +922,7 @@ public class FlinkKinesisConsumerTest extends TestLogger {
                testHarness.open();
 
                final ConcurrentLinkedQueue<Object> results = 
testHarness.getOutput();
+               final AtomicBoolean throwOnCollect = new AtomicBoolean();
 
                @SuppressWarnings("unchecked")
                SourceFunction.SourceContext<String> sourceContext = new 
CollectingSourceContext(
@@ -929,11 +932,20 @@ public class FlinkKinesisConsumerTest extends TestLogger {
                        }
 
                        @Override
+                       public void collect(Serializable element) {
+                               if (throwOnCollect.get()) {
+                                       throw new RuntimeException("expected");
+                               }
+                               super.collect(element);
+                       }
+
+                       @Override
                        public void emitWatermark(Watermark mark) {
                                results.add(mark);
                        }
                };
 
+               final AtomicReference<Exception> sourceThreadError = new 
AtomicReference<>();
                new Thread(
                        () -> {
                                try {
@@ -941,7 +953,7 @@ public class FlinkKinesisConsumerTest extends TestLogger {
                                } catch (InterruptedException e) {
                                        // expected on cancel
                                } catch (Exception e) {
-                                       throw new RuntimeException(e);
+                                       sourceThreadError.set(e);
                                }
                        })
                        .start();
@@ -999,6 +1011,18 @@ public class FlinkKinesisConsumerTest extends TestLogger {
                expectedResults.add(new Watermark(3000));
                assertThat(results, 
org.hamcrest.Matchers.contains(expectedResults.toArray()));
 
+               // verify exception propagation
+               throwOnCollect.set(true);
+               shard1.put(Long.toString(record2 + 1));
+
+               Assert.assertNull(sourceThreadError.get());
+               deadline  = Deadline.fromNow(Duration.ofSeconds(10));
+               while (deadline.hasTimeLeft() && sourceThreadError.get() == 
null) {
+                       Thread.sleep(10);
+               }
+               Assert.assertNotNull(sourceThreadError.get());
+               Assert.assertNotNull("expected", 
sourceThreadError.get().getMessage());
+
                sourceFunc.cancel();
                testHarness.close();
        }

Reply via email to