This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
     new 4a4dae0  [FLINK-14107][kinesis] Erroneous queue selection in record 
emitter may lead to deadlock
4a4dae0 is described below

commit 4a4dae0d81b830d5c4245d61ca95dbffaf2a1f2d
Author: Thomas Weise <t...@apache.org>
AuthorDate: Mon Sep 16 13:33:42 2019 -0700

    [FLINK-14107][kinesis] Erroneous queue selection in record emitter may lead 
to deadlock
---
 .../connectors/kinesis/util/RecordEmitter.java     |  5 ++
 .../connectors/kinesis/util/RecordEmitterTest.java | 66 ++++++++++++++++++++--
 2 files changed, 65 insertions(+), 6 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
index 17344b1..7fa21fe 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitter.java
@@ -239,6 +239,11 @@ public abstract class RecordEmitter<T extends 
TimestampedValue> implements Runna
                        }
                        if (record == null) {
                                this.emptyQueues.put(min, true);
+                       } else if (nextQueue != null && nextQueue.headTimestamp 
> min.headTimestamp) {
+                               // if we stopped emitting due to reaching max 
timestamp,
+                               // the next queue may not be the new min
+                               heads.offer(nextQueue);
+                               nextQueue = min;
                        } else {
                                heads.offer(min);
                        }
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java
index 1948237..84949cc 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/util/RecordEmitterTest.java
@@ -17,12 +17,14 @@
 
 package org.apache.flink.streaming.connectors.kinesis.util;
 
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.streaming.runtime.operators.windowing.TimestampedValue;
 
 import org.hamcrest.Matchers;
 import org.junit.Assert;
 import org.junit.Test;
 
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
@@ -32,10 +34,10 @@ import java.util.concurrent.Executors;
 /** Test for {@link RecordEmitter}. */
 public class RecordEmitterTest {
 
-       static List<TimestampedValue> results = 
Collections.synchronizedList(new ArrayList<>());
-
        private class TestRecordEmitter extends RecordEmitter<TimestampedValue> 
{
 
+               private List<TimestampedValue> results = 
Collections.synchronizedList(new ArrayList<>());
+
                private TestRecordEmitter() {
                        super(DEFAULT_QUEUE_CAPACITY);
                }
@@ -68,14 +70,66 @@ public class RecordEmitterTest {
                ExecutorService executor = Executors.newSingleThreadExecutor();
                executor.submit(emitter);
 
-               long timeout = System.currentTimeMillis() + 10_000;
-               while (results.size() != 4 && System.currentTimeMillis() < 
timeout) {
-                       Thread.sleep(100);
+               Deadline dl = Deadline.fromNow(Duration.ofSeconds(10));
+               while (emitter.results.size() != 4 && dl.hasTimeLeft()) {
+                       Thread.sleep(10);
                }
                emitter.stop();
                executor.shutdownNow();
 
-               Assert.assertThat(results, Matchers.contains(one, five, two, 
ten));
+               Assert.assertThat(emitter.results, Matchers.contains(one, five, 
two, ten));
        }
 
+       @Test
+       public void testRetainMinAfterReachingLimit() throws Exception {
+
+               TestRecordEmitter emitter = new TestRecordEmitter();
+
+               final TimestampedValue<String> one = new 
TimestampedValue<>("1", 1);
+               final TimestampedValue<String> two = new 
TimestampedValue<>("2", 2);
+               final TimestampedValue<String> three = new 
TimestampedValue<>("3", 3);
+               final TimestampedValue<String> ten = new 
TimestampedValue<>("10", 10);
+               final TimestampedValue<String> eleven = new 
TimestampedValue<>("11", 11);
+
+               final TimestampedValue<String> twenty = new 
TimestampedValue<>("20", 20);
+               final TimestampedValue<String> thirty = new 
TimestampedValue<>("30", 30);
+
+               final RecordEmitter.RecordQueue<TimestampedValue> queue0 = 
emitter.getQueue(0);
+               final RecordEmitter.RecordQueue<TimestampedValue> queue1 = 
emitter.getQueue(1);
+
+               queue0.put(one);
+               queue0.put(two);
+               queue0.put(three);
+               queue0.put(ten);
+               queue0.put(eleven);
+
+               queue1.put(twenty);
+               queue1.put(thirty);
+
+               emitter.setMaxLookaheadMillis(1);
+               emitter.setCurrentWatermark(5);
+
+               ExecutorService executor = Executors.newSingleThreadExecutor();
+               executor.submit(emitter);
+               try {
+                       // emits one record past the limit
+                       Deadline dl = Deadline.fromNow(Duration.ofSeconds(10));
+                       while (emitter.results.size() != 4 && dl.hasTimeLeft()) 
{
+                               Thread.sleep(10);
+                       }
+                       Assert.assertThat(emitter.results, 
Matchers.contains(one, two, three, ten));
+
+                       // advance watermark, emits remaining record from queue0
+                       emitter.setCurrentWatermark(10);
+                       dl = Deadline.fromNow(Duration.ofSeconds(10));
+                       while (emitter.results.size() != 5 && dl.hasTimeLeft()) 
{
+                               Thread.sleep(10);
+                       }
+                       Assert.assertThat(emitter.results, 
Matchers.contains(one, two, three, ten, eleven));
+               }
+               finally {
+                       emitter.stop();
+                       executor.shutdownNow();
+               }
+       }
 }

Reply via email to