Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/6021#discussion_r197070282
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisProducerTest.java
 ---
    @@ -267,6 +268,79 @@ public void go() throws Exception {
                testHarness.close();
        }
     
    +   /**
    +    * Test ensuring that the producer blocks if the queue limit is 
exceeded,
    +    * until the queue length drops below the limit;
    +    * we set a timeout because the test will not finish if the logic is 
broken.
    +    */
    +   @Test(timeout = 10000)
    +   public void testBackpressure() throws Throwable {
    +           final DummyFlinkKinesisProducer<String> producer = new 
DummyFlinkKinesisProducer<>(new SimpleStringSchema());
    +           producer.setQueueLimit(1);
    +
    +           OneInputStreamOperatorTestHarness<String, Object> testHarness =
    +                           new OneInputStreamOperatorTestHarness<>(new 
StreamSink<>(producer));
    +
    +           testHarness.open();
    +
    +           UserRecordResult result = mock(UserRecordResult.class);
    +           when(result.isSuccessful()).thenReturn(true);
    +
    +           CheckedThread msg1 = new CheckedThread() {
    +                   @Override
    +                   public void go() throws Exception {
    +                           testHarness.processElement(new 
StreamRecord<>("msg-1"));
    +                   }
    +           };
    +           msg1.start();
    +           msg1.trySync(100);
    +           assertFalse("Flush triggered before reaching queue limit", 
msg1.isAlive());
    --- End diff --
    
    I wonder if this would introduce flakiness in the test.
    @fmthoma could you elaborate a bit here?


---

Reply via email to