This is an automated email from the ASF dual-hosted git repository.

arvid pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 09a4af6  [FLINK-22613][tests] Fix 
FlinkKinesisITCase.testStopWithSavepoint.
09a4af6 is described below

commit 09a4af662339ae320f5a3cafd59ec816a1048b7a
Author: Arvid Heise <ar...@ververica.com>
AuthorDate: Tue May 25 18:21:49 2021 +0200

    [FLINK-22613][tests] Fix FlinkKinesisITCase.testStopWithSavepoint.
    
    The test relies on a few elements remaining pending after 
stop-with-savepoint, however that can only be guaranteed heuristically: We 
cannot block task thread or else the respective savepoint will not succeed but 
we also cannot add infinite input as it's an IT test against Kinesis. Here, the 
fix is to add much more elements to Kinesis stream. An optimized sendMessage on 
the test client ensures timely setup.
---
 .../connectors/kinesis/FlinkKinesisITCase.java     | 37 ++++++++++++---------
 .../kinesis/testutils/KinesisPubsubClient.java     | 38 +++++++++++++++-------
 2 files changed, 48 insertions(+), 27 deletions(-)

diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
index 0604a1c..f5a4439 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java
@@ -21,6 +21,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.serialization.SimpleStringSchema;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition;
 import 
org.apache.flink.streaming.connectors.kinesis.testutils.KinesaliteContainer;
@@ -30,7 +31,6 @@ import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
 import org.junit.ClassRule;
-import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -87,20 +87,15 @@ public class FlinkKinesisITCase extends TestLogger {
      *   <li>With the fix, the job proceeds and we can lift the backpressure.
      * </ol>
      */
-    @Ignore("Ignored until FLINK-22613 is fixed")
     @Test
     public void testStopWithSavepoint() throws Exception {
         client.createTopic(TEST_STREAM, 1, new Properties());
 
         // add elements to the test stream
-        int numElements = 10;
-        List<String> elements =
-                IntStream.range(0, numElements)
-                        .mapToObj(String::valueOf)
-                        .collect(Collectors.toList());
-        for (String element : elements) {
-            client.sendMessage(TEST_STREAM, element);
-        }
+        int numElements = 1000;
+        client.sendMessage(
+                TEST_STREAM,
+                IntStream.range(0, 
numElements).mapToObj(String::valueOf).toArray(String[]::new));
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
         env.setParallelism(1);
@@ -110,6 +105,7 @@ public class FlinkKinesisITCase extends TestLogger {
         FlinkKinesisConsumer<String> consumer =
                 new FlinkKinesisConsumer<>(TEST_STREAM, STRING_SCHEMA, config);
 
+        DataStream<String> stream = env.addSource(consumer).map(new 
WaitingMapper());
         // call stop with savepoint in another thread
         ForkJoinTask<Object> stopTask =
                 ForkJoinPool.commonPool()
@@ -120,14 +116,18 @@ public class FlinkKinesisITCase extends TestLogger {
                                     WaitingMapper.stopped = true;
                                     return null;
                                 });
-
         try {
-            List<String> result =
-                    env.addSource(consumer).map(new 
WaitingMapper()).executeAndCollect(10000);
+            List<String> result = stream.executeAndCollect(10000);
             // stop with savepoint will most likely only return a small subset 
of the elements
             // validate that the prefix is as expected
             assertThat(result, hasSize(lessThan(numElements)));
-            assertThat(result, equalTo(elements.subList(0, result.size())));
+            assertThat(
+                    result,
+                    equalTo(
+                            IntStream.range(0, numElements)
+                                    .mapToObj(String::valueOf)
+                                    .collect(Collectors.toList())
+                                    .subList(0, result.size())));
         } finally {
             stopTask.cancel(true);
         }
@@ -143,8 +143,13 @@ public class FlinkKinesisITCase extends TestLogger {
     }
 
     private static class WaitingMapper implements MapFunction<String, String> {
-        static CountDownLatch firstElement = new CountDownLatch(1);
-        static volatile boolean stopped = false;
+        static CountDownLatch firstElement;
+        static volatile boolean stopped;
+
+        WaitingMapper() {
+            firstElement = new CountDownLatch(1);
+            stopped = false;
+        }
 
         @Override
         public String map(String value) throws Exception {
diff --git 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisPubsubClient.java
 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisPubsubClient.java
index 0d15656..0f0cb6c 100644
--- 
a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisPubsubClient.java
+++ 
b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisPubsubClient.java
@@ -25,14 +25,18 @@ import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
 import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
 import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil;
 
+import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
+
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.auth.AWSCredentialsProvider;
 import com.amazonaws.client.builder.AwsClientBuilder;
 import com.amazonaws.services.kinesis.AmazonKinesis;
 import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
 import com.amazonaws.services.kinesis.model.GetRecordsResult;
-import com.amazonaws.services.kinesis.model.PutRecordRequest;
-import com.amazonaws.services.kinesis.model.PutRecordResult;
+import com.amazonaws.services.kinesis.model.PutRecordsRequest;
+import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry;
+import com.amazonaws.services.kinesis.model.PutRecordsResult;
+import com.amazonaws.services.kinesis.model.PutRecordsResultEntry;
 import com.amazonaws.services.kinesis.model.Record;
 import com.amazonaws.services.kinesis.model.ResourceNotFoundException;
 import org.slf4j.Logger;
@@ -41,11 +45,13 @@ import org.slf4j.LoggerFactory;
 import java.nio.ByteBuffer;
 import java.time.Duration;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 import java.util.function.Function;
+import java.util.stream.Collectors;
 
 /**
  * Simple client to publish and retrieve messages, using the AWS Kinesis SDK 
and the Flink Kinesis
@@ -87,17 +93,27 @@ public class KinesisPubsubClient {
         }
     }
 
-    public void sendMessage(String topic, String msg) {
-        sendMessage(topic, msg.getBytes());
+    public void sendMessage(String topic, String... messages) {
+        sendMessage(topic, 
Arrays.stream(messages).map(String::getBytes).toArray(byte[][]::new));
     }
 
-    public void sendMessage(String topic, byte[] data) {
-        PutRecordRequest putRecordRequest = new PutRecordRequest();
-        putRecordRequest.setStreamName(topic);
-        putRecordRequest.setPartitionKey("fakePartitionKey");
-        putRecordRequest.withData(ByteBuffer.wrap(data));
-        PutRecordResult putRecordResult = 
kinesisClient.putRecord(putRecordRequest);
-        LOG.info("added record: {}", putRecordResult.getSequenceNumber());
+    public void sendMessage(String topic, byte[]... messages) {
+        for (List<byte[]> partition : Lists.partition(Arrays.asList(messages), 
500)) {
+            List<PutRecordsRequestEntry> entries =
+                    partition.stream()
+                            .map(
+                                    msg ->
+                                            new PutRecordsRequestEntry()
+                                                    
.withPartitionKey("fakePartitionKey")
+                                                    
.withData(ByteBuffer.wrap(msg)))
+                            .collect(Collectors.toList());
+            PutRecordsRequest requests =
+                    new 
PutRecordsRequest().withStreamName(topic).withRecords(entries);
+            PutRecordsResult putRecordResult = 
kinesisClient.putRecords(requests);
+            for (PutRecordsResultEntry result : putRecordResult.getRecords()) {
+                LOG.debug("added record: {}", result.getSequenceNumber());
+            }
+        }
     }
 
     public List<String> readAllMessages(String streamName) throws Exception {

Reply via email to