This is an automated email from the ASF dual-hosted git repository. arvid pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 09a4af6 [FLINK-22613][tests] Fix FlinkKinesisITCase.testStopWithSavepoint. 09a4af6 is described below commit 09a4af662339ae320f5a3cafd59ec816a1048b7a Author: Arvid Heise <ar...@ververica.com> AuthorDate: Tue May 25 18:21:49 2021 +0200 [FLINK-22613][tests] Fix FlinkKinesisITCase.testStopWithSavepoint. The test relies on a few elements remaining pending after stop-with-savepoint, however that can only be guaranteed heuristically: We cannot block task thread or else the respective savepoint will not succeed but we also cannot add infinite input as it's an IT test against Kinesis. Here, the fix is to add much more elements to Kinesis stream. An optimized sendMessage on the test client ensures timely setup. --- .../connectors/kinesis/FlinkKinesisITCase.java | 37 ++++++++++++--------- .../kinesis/testutils/KinesisPubsubClient.java | 38 +++++++++++++++------- 2 files changed, 48 insertions(+), 27 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java index 0604a1c..f5a4439 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisITCase.java @@ -21,6 +21,7 @@ import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants.InitialPosition; import org.apache.flink.streaming.connectors.kinesis.testutils.KinesaliteContainer; @@ -30,7 +31,6 @@ import org.apache.flink.util.TestLogger; import org.junit.Before; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -87,20 +87,15 @@ public class FlinkKinesisITCase extends TestLogger { * <li>With the fix, the job proceeds and we can lift the backpressure. * </ol> */ - @Ignore("Ignored until FLINK-22613 is fixed") @Test public void testStopWithSavepoint() throws Exception { client.createTopic(TEST_STREAM, 1, new Properties()); // add elements to the test stream - int numElements = 10; - List<String> elements = - IntStream.range(0, numElements) - .mapToObj(String::valueOf) - .collect(Collectors.toList()); - for (String element : elements) { - client.sendMessage(TEST_STREAM, element); - } + int numElements = 1000; + client.sendMessage( + TEST_STREAM, + IntStream.range(0, numElements).mapToObj(String::valueOf).toArray(String[]::new)); StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(1); @@ -110,6 +105,7 @@ public class FlinkKinesisITCase extends TestLogger { FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>(TEST_STREAM, STRING_SCHEMA, config); + DataStream<String> stream = env.addSource(consumer).map(new WaitingMapper()); // call stop with savepoint in another thread ForkJoinTask<Object> stopTask = ForkJoinPool.commonPool() @@ -120,14 +116,18 @@ public class FlinkKinesisITCase extends TestLogger { WaitingMapper.stopped = true; return null; }); - try { - List<String> result = - env.addSource(consumer).map(new WaitingMapper()).executeAndCollect(10000); + List<String> result = stream.executeAndCollect(10000); // stop with savepoint will most likely only return a small subset of the elements // validate that the prefix is as expected assertThat(result, hasSize(lessThan(numElements))); - assertThat(result, equalTo(elements.subList(0, result.size()))); + assertThat( + result, + equalTo( + IntStream.range(0, numElements) + .mapToObj(String::valueOf) + .collect(Collectors.toList()) + .subList(0, result.size()))); } finally { stopTask.cancel(true); } @@ -143,8 +143,13 @@ public class FlinkKinesisITCase extends TestLogger { } private static class WaitingMapper implements MapFunction<String, String> { - static CountDownLatch firstElement = new CountDownLatch(1); - static volatile boolean stopped = false; + static CountDownLatch firstElement; + static volatile boolean stopped; + + WaitingMapper() { + firstElement = new CountDownLatch(1); + stopped = false; + } @Override public String map(String value) throws Exception { diff --git a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisPubsubClient.java b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisPubsubClient.java index 0d15656..0f0cb6c 100644 --- a/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisPubsubClient.java +++ b/flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/testutils/KinesisPubsubClient.java @@ -25,14 +25,18 @@ import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; import org.apache.flink.streaming.connectors.kinesis.util.AWSUtil; +import org.apache.flink.shaded.guava18.com.google.common.collect.Lists; + import com.amazonaws.AmazonClientException; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.client.builder.AwsClientBuilder; import com.amazonaws.services.kinesis.AmazonKinesis; import com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; import com.amazonaws.services.kinesis.model.GetRecordsResult; -import com.amazonaws.services.kinesis.model.PutRecordRequest; -import com.amazonaws.services.kinesis.model.PutRecordResult; +import com.amazonaws.services.kinesis.model.PutRecordsRequest; +import com.amazonaws.services.kinesis.model.PutRecordsRequestEntry; +import com.amazonaws.services.kinesis.model.PutRecordsResult; +import com.amazonaws.services.kinesis.model.PutRecordsResultEntry; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kinesis.model.ResourceNotFoundException; import org.slf4j.Logger; @@ -41,11 +45,13 @@ import org.slf4j.LoggerFactory; import java.nio.ByteBuffer; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Properties; import java.util.function.Function; +import java.util.stream.Collectors; /** * Simple client to publish and retrieve messages, using the AWS Kinesis SDK and the Flink Kinesis @@ -87,17 +93,27 @@ public class KinesisPubsubClient { } } - public void sendMessage(String topic, String msg) { - sendMessage(topic, msg.getBytes()); + public void sendMessage(String topic, String... messages) { + sendMessage(topic, Arrays.stream(messages).map(String::getBytes).toArray(byte[][]::new)); } - public void sendMessage(String topic, byte[] data) { - PutRecordRequest putRecordRequest = new PutRecordRequest(); - putRecordRequest.setStreamName(topic); - putRecordRequest.setPartitionKey("fakePartitionKey"); - putRecordRequest.withData(ByteBuffer.wrap(data)); - PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest); - LOG.info("added record: {}", putRecordResult.getSequenceNumber()); + public void sendMessage(String topic, byte[]... messages) { + for (List<byte[]> partition : Lists.partition(Arrays.asList(messages), 500)) { + List<PutRecordsRequestEntry> entries = + partition.stream() + .map( + msg -> + new PutRecordsRequestEntry() + .withPartitionKey("fakePartitionKey") + .withData(ByteBuffer.wrap(msg))) + .collect(Collectors.toList()); + PutRecordsRequest requests = + new PutRecordsRequest().withStreamName(topic).withRecords(entries); + PutRecordsResult putRecordResult = kinesisClient.putRecords(requests); + for (PutRecordsResultEntry result : putRecordResult.getRecords()) { + LOG.debug("added record: {}", result.getSequenceNumber()); + } + } } public List<String> readAllMessages(String streamName) throws Exception {