This is an automated email from the ASF dual-hosted git repository. thw pushed a commit to branch release-1.8 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.8 by this push: new 11af452 [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test 11af452 is described below commit 11af4523801164539e186d836462f5884b561941 Author: Thomas Weise <t...@apache.org> AuthorDate: Thu Feb 28 16:11:50 2019 -0800 [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test --- .../flink-streaming-kinesis-test/pom.xml | 92 +++++++++++++++ .../streaming/kinesis/test/KinesisExample.java | 91 +++++++++++++++ .../streaming/kinesis/test/KinesisExampleTest.java | 127 ++++++++++++++++++++ .../kinesis/test/KinesisPubsubClient.java | 128 +++++++++++++++++++++ flink-end-to-end-tests/pom.xml | 21 ++++ flink-end-to-end-tests/run-pre-commit-tests.sh | 1 + flink-end-to-end-tests/test-scripts/common.sh | 2 +- .../test-scripts/test_streaming_kinesis.sh | 63 ++++++++++ 8 files changed, 524 insertions(+), 1 deletion(-) diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml new file mode 100644 index 0000000..2774964 --- /dev/null +++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml @@ -0,0 +1,92 @@ +<?xml version="1.0" encoding="UTF-8"?> +<!-- +Licensed to the Apache Software Foundation (ASF) under one +or more contributor license agreements. See the NOTICE file +distributed with this work for additional information +regarding copyright ownership. The ASF licenses this file +to you under the Apache License, Version 2.0 (the +"License"); you may not use this file except in compliance +with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, +software distributed under the License is distributed on an +"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +KIND, either express or implied. See the License for the +specific language governing permissions and limitations +under the License. +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>flink-end-to-end-tests</artifactId> + <groupId>org.apache.flink</groupId> + <version>1.8-SNAPSHOT</version> + <relativePath>..</relativePath> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>flink-streaming-kinesis-test_${scala.binary.version}</artifactId> + <name>flink-streaming-kinesis-test</name> + <packaging>jar</packaging> + + <dependencies> + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-kafka-test-base_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.apache.flink</groupId> + <artifactId>flink-streaming-java_${scala.binary.version}</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <version>${junit.version}</version> + <scope>compile</scope> + </dependency> + </dependencies> + + <build> + <plugins> + <!-- Use the shade plugin to build a fat jar for the Kinesis connector test --> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-shade-plugin</artifactId> + <executions> + <execution> + <id>fat-jar-kinesis-example</id> + <phase>package</phase> + <goals> + <goal>shade</goal> + </goals> + <configuration> + <shadeTestJar>false</shadeTestJar> + <shadedArtifactAttached>false</shadedArtifactAttached> + <createDependencyReducedPom>false</createDependencyReducedPom> + <transformers> + <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> + <mainClass>org.apache.flink.streaming.kinesis.test.KinesisExample</mainClass> + </transformer> + </transformers> + <finalName>KinesisExample</finalName> + </configuration> + </execution> + </executions> + </plugin> + </plugins> + </build> + +</project> diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java new file mode 100644 index 0000000..4957c35 --- /dev/null +++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.kinesis.test; + +import org.apache.flink.api.java.utils.ParameterTool; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; +import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.kafka.test.base.CustomWatermarkExtractor; +import org.apache.flink.streaming.kafka.test.base.KafkaEvent; +import org.apache.flink.streaming.kafka.test.base.KafkaEventSchema; +import org.apache.flink.streaming.kafka.test.base.KafkaExampleUtil; +import org.apache.flink.streaming.kafka.test.base.RollingAdditionMapper; + +import java.net.URL; +import java.util.Properties; + +/** + * A simple example that shows how to read from and write to Kinesis. This will read String messages + * from the input topic, parse them into a POJO type {@link KafkaEvent}, group by some key, and finally + * perform a rolling addition on each key for which the results are written back to another topic. + * + * <p>This example also demonstrates using a watermark assigner to generate per-partition + * watermarks directly in the Flink Kinesis consumer. For demonstration purposes, it is assumed that + * the String messages formatted as a (word,frequency,timestamp) tuple. + * + * <p>Example usage: + * --input-stream test-input --output-stream test-output --aws.endpoint https://localhost:4567 --flink.stream.initpos TRIM_HORIZON + */ +public class KinesisExample { + public static void main(String[] args) throws Exception { + // parse input arguments + final ParameterTool parameterTool = ParameterTool.fromArgs(args); + StreamExecutionEnvironment env = KafkaExampleUtil.prepareExecutionEnv(parameterTool); + + String inputStream = parameterTool.getRequired("input-stream"); + String outputStream = parameterTool.getRequired("output-stream"); + + FlinkKinesisConsumer<KafkaEvent> consumer = new FlinkKinesisConsumer<>( + inputStream, + new KafkaEventSchema(), + parameterTool.getProperties()); + consumer.setPeriodicWatermarkAssigner(new CustomWatermarkExtractor()); + + Properties producerProperties = new Properties(parameterTool.getProperties()); + // producer needs region even when URL is specified + producerProperties.putIfAbsent(ConsumerConfigConstants.AWS_REGION, "us-east-1"); + // test driver does not deaggregate + producerProperties.putIfAbsent("AggregationEnabled", String.valueOf(false)); + + // KPL does not recognize endpoint URL.. + String kinesisUrl = producerProperties.getProperty(ConsumerConfigConstants.AWS_ENDPOINT); + if (kinesisUrl != null) { + URL url = new URL(kinesisUrl); + producerProperties.put("KinesisEndpoint", url.getHost()); + producerProperties.put("KinesisPort", Integer.toString(url.getPort())); + producerProperties.put("VerifyCertificate", "false"); + } + + FlinkKinesisProducer<KafkaEvent> producer = new FlinkKinesisProducer<>( + new KafkaEventSchema(), + producerProperties); + producer.setDefaultStream(outputStream); + producer.setDefaultPartition("fakePartition"); + + DataStream<KafkaEvent> input = env + .addSource(consumer) + .keyBy("word") + .map(new RollingAdditionMapper()); + + input.addSink(producer); + env.execute(); + } +} diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java new file mode 100644 index 0000000..1a6d6d7 --- /dev/null +++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.kinesis.test; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.api.java.utils.ParameterTool; + +import org.junit.Assert; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; + +/** + * Test driver for {@link KinesisExample#main}. + */ +public class KinesisExampleTest { + private static final Logger LOG = LoggerFactory.getLogger(KinesisExampleTest.class); + + /** + * Interface to the pubsub system for this test. + */ + interface PubsubClient { + void createTopic(String topic, int partitions, Properties props) throws Exception; + + void sendMessage(String topic, String msg); + + List<String> readAllMessages(String streamName) throws Exception; + } + + public static void main(String[] args) throws Exception { + LOG.info("System properties: {}", System.getProperties()); + final ParameterTool parameterTool = ParameterTool.fromArgs(args); + + String inputStream = parameterTool.getRequired("input-stream"); + String outputStream = parameterTool.getRequired("output-stream"); + + PubsubClient pubsub = new KinesisPubsubClient(parameterTool.getProperties()); + pubsub.createTopic(inputStream, 2, parameterTool.getProperties()); + pubsub.createTopic(outputStream, 2, parameterTool.getProperties()); + + // The example job needs to start after streams are created and run in parallel to the validation logic. + // The thread that runs the job won't terminate, we don't have a job reference to cancel it. + // Once results are validated, the driver main thread will exit; job/cluster will be terminated from script. + final AtomicReference<Exception> executeException = new AtomicReference<>(); + Thread executeThread = + new Thread( + () -> { + try { + KinesisExample.main(args); + // this message won't appear in the log, + // job is terminated when shutting down cluster + LOG.info("executed program"); + } catch (Exception e) { + executeException.set(e); + } + }); + executeThread.start(); + + // generate input + String[] messages = { + "elephant,5,45218", + "squirrel,12,46213", + "bee,3,51348", + "squirrel,22,52444", + "bee,10,53412", + "elephant,9,54867" + }; + for (String msg : messages) { + pubsub.sendMessage(inputStream, msg); + } + LOG.info("generated records"); + + Deadline deadline = Deadline.fromNow(Duration.ofSeconds(60)); + List<String> results = pubsub.readAllMessages(outputStream); + while (deadline.hasTimeLeft() && executeException.get() == null && results.size() < messages.length) { + LOG.info("waiting for results.."); + Thread.sleep(1000); + results = pubsub.readAllMessages(outputStream); + } + + if (executeException.get() != null) { + throw executeException.get(); + } + + LOG.info("results: {}", results); + Assert.assertEquals("Results received from '" + outputStream + "': " + results, + messages.length, results.size()); + + String[] expectedResults = { + "elephant,5,45218", + "elephant,14,54867", + "squirrel,12,46213", + "squirrel,34,52444", + "bee,3,51348", + "bee,13,53412" + }; + + for (String expectedResult : expectedResults) { + Assert.assertTrue(expectedResult, results.contains(expectedResult)); + } + + // TODO: main thread needs to create job or CLI fails with: + // "The program didn't contain a Flink job. Perhaps you forgot to call execute() on the execution environment." + System.out.println("test finished"); + System.exit(0); + } + +} diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisPubsubClient.java b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisPubsubClient.java new file mode 100644 index 0000000..486b565 --- /dev/null +++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisPubsubClient.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.kinesis.test; + +import org.apache.flink.api.common.time.Deadline; +import org.apache.flink.kinesis.shaded.com.amazonaws.AmazonClientException; +import org.apache.flink.kinesis.shaded.com.amazonaws.auth.AWSCredentialsProvider; +import org.apache.flink.kinesis.shaded.com.amazonaws.auth.EnvironmentVariableCredentialsProvider; +import org.apache.flink.kinesis.shaded.com.amazonaws.client.builder.AwsClientBuilder; +import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesis; +import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClientBuilder; +import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetRecordsResult; +import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.PutRecordRequest; +import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.PutRecordResult; +import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record; +import org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ResourceNotFoundException; +import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; +import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle; +import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy; +import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Properties; + +class KinesisPubsubClient implements KinesisExampleTest.PubsubClient { + private static final Logger LOG = LoggerFactory.getLogger(KinesisPubsubClient.class); + + private final AmazonKinesis kinesisClient; + private final Properties properties; + + KinesisPubsubClient(Properties properties) { + this.kinesisClient = createClientWithCredentials(properties); + this.properties = properties; + } + + @Override + public void createTopic(String stream, int shards, Properties props) throws Exception { + try { + kinesisClient.describeStream(stream); + kinesisClient.deleteStream(stream); + } catch (ResourceNotFoundException rnfe) { + // expected when stream doesn't exist + } + + kinesisClient.createStream(stream, shards); + Deadline deadline = Deadline.fromNow(Duration.ofSeconds(5)); + while (deadline.hasTimeLeft()) { + try { + Thread.sleep(250); // sleep for a bit for stream to be created + if (kinesisClient.describeStream(stream).getStreamDescription() + .getShards().size() != shards) { + // not fully created yet + continue; + } + break; + } catch (ResourceNotFoundException rnfe) { + // not ready yet + } + } + } + + @Override + public void sendMessage(String topic, String msg) { + PutRecordRequest putRecordRequest = new PutRecordRequest(); + putRecordRequest.setStreamName(topic); + putRecordRequest.setPartitionKey("fakePartitionKey"); + putRecordRequest.withData(ByteBuffer.wrap(msg.getBytes())); + PutRecordResult putRecordResult = kinesisClient.putRecord(putRecordRequest); + LOG.info("added record: {}", putRecordResult.getSequenceNumber()); + } + + @Override + public List<String> readAllMessages(String streamName) throws Exception { + KinesisProxyInterface kinesisProxy = KinesisProxy.create(properties); + Map<String, String> streamNamesWithLastSeenShardIds = new HashMap<>(); + streamNamesWithLastSeenShardIds.put(streamName, null); + + GetShardListResult shardListResult = kinesisProxy.getShardList(streamNamesWithLastSeenShardIds); + int maxRecordsToFetch = 10; + + List<String> messages = new ArrayList<>(); + // retrieve records from all shards + for (StreamShardHandle ssh : shardListResult.getRetrievedShardListOfStream(streamName)) { + String shardIterator = kinesisProxy.getShardIterator(ssh, "TRIM_HORIZON", null); + GetRecordsResult getRecordsResult = kinesisProxy.getRecords(shardIterator, maxRecordsToFetch); + List<Record> aggregatedRecords = getRecordsResult.getRecords(); + for (Record record : aggregatedRecords) { + messages.add(new String(record.getData().array())); + } + } + return messages; + } + + private static AmazonKinesis createClientWithCredentials(Properties props) throws AmazonClientException { + AWSCredentialsProvider credentialsProvider = new EnvironmentVariableCredentialsProvider(); + return AmazonKinesisClientBuilder.standard() + .withCredentials(credentialsProvider) + .withEndpointConfiguration( + new AwsClientBuilder.EndpointConfiguration( + props.getProperty(ConsumerConfigConstants.AWS_ENDPOINT), "us-east-1")) + .build(); + } + +} diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml index f6c36dc..b50ca6b 100644 --- a/flink-end-to-end-tests/pom.xml +++ b/flink-end-to-end-tests/pom.xml @@ -68,6 +68,27 @@ under the License. <module>flink-streaming-kafka010-test</module> </modules> + <!-- See main pom.xml for explanation of profiles --> + <profiles> + <!-- + We include the kinesis module only optionally because it contains a dependency + licenced under the "Amazon Software License". + In accordance with the discussion in https://issues.apache.org/jira/browse/LEGAL-198 + this is an optional module for Flink. + --> + <profile> + <id>include-kinesis</id> + <activation> + <property> + <name>include-kinesis</name> + </property> + </activation> + <modules> + <module>flink-streaming-kinesis-test</module> + </modules> + </profile> + </profiles> + <build> <plugins> <plugin> diff --git a/flink-end-to-end-tests/run-pre-commit-tests.sh b/flink-end-to-end-tests/run-pre-commit-tests.sh index 4a9a1a0..f40fd56 100755 --- a/flink-end-to-end-tests/run-pre-commit-tests.sh +++ b/flink-end-to-end-tests/run-pre-commit-tests.sh @@ -56,6 +56,7 @@ run_test "Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_batch_wo run_test "Kafka 0.10 end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_kafka010.sh" run_test "Kafka 0.11 end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_kafka011.sh" run_test "Modern Kafka end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_kafka.sh" +run_test "Kinesis end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_kinesis.sh" run_test "class loading end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_classloader.sh" run_test "Shaded Hadoop S3A end-to-end test" "$END_TO_END_DIR/test-scripts/test_shaded_hadoop_s3a.sh" run_test "Shaded Presto S3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_shaded_presto_s3.sh" diff --git a/flink-end-to-end-tests/test-scripts/common.sh b/flink-end-to-end-tests/test-scripts/common.sh index c06249e..1a54aba 100644 --- a/flink-end-to-end-tests/test-scripts/common.sh +++ b/flink-end-to-end-tests/test-scripts/common.sh @@ -47,7 +47,7 @@ cd $TEST_INFRA_DIR TEST_INFRA_DIR=`pwd -P` cd $TEST_ROOT -NODENAME=`hostname -f` +NODENAME=${NODENAME:-`hostname -f`} # REST_PROTOCOL and CURL_SSL_ARGS can be modified in common_ssl.sh if SSL is activated # they should be used in curl command to query Flink REST API diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kinesis.sh b/flink-end-to-end-tests/test-scripts/test_streaming_kinesis.sh new file mode 100755 index 0000000..1981b13 --- /dev/null +++ b/flink-end-to-end-tests/test-scripts/test_streaming_kinesis.sh @@ -0,0 +1,63 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +source "$(dirname "$0")"/common.sh + +# Kinesalite doesn't support CBOR +export AWS_CBOR_DISABLE=1 + +# Required by the KPL native process +export AWS_ACCESS_KEY_ID=flinkKinesisTestFakeAccessKeyId +export AWS_SECRET_KEY=flinkKinesisTestFakeAccessKey + +KINESALITE_PORT=4567 + +#docker run -d --rm --name flink-test-kinesis -p ${KINESALITE_PORT}:${KINESALITE_PORT} instructure/kinesalite +# override entrypoint to enable SSL +docker run -d --rm --entrypoint "/tini" --name flink-test-kinesis -p ${KINESALITE_PORT}:${KINESALITE_PORT} instructure/kinesalite -- /usr/src/app/node_modules/kinesalite/cli.js --path /var/lib/kinesalite --ssl + +# reveal potential issues with the container in the CI environment +docker logs flink-test-kinesis + +function test_cleanup { + # don't call ourselves again for another signal interruption + trap "exit -1" INT + # don't call ourselves again for normal exit + trap "" EXIT + # job needs to stop before kinesalite + stop_cluster + echo "terminating kinesalite" + docker kill flink-test-kinesis +} +trap test_cleanup INT +trap test_cleanup EXIT + +# prefix com.amazonaws.sdk.disableCertChecking to account for shading +DISABLE_CERT_CHECKING_JAVA_OPTS="-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking" + +export FLINK_ENV_JAVA_OPTS=${DISABLE_CERT_CHECKING_JAVA_OPTS} +start_cluster + +TEST_JAR="${END_TO_END_DIR}/flink-streaming-kinesis-test/target/KinesisExample.jar" +JVM_ARGS=${DISABLE_CERT_CHECKING_JAVA_OPTS} \ +$FLINK_DIR/bin/flink run -p 1 -c org.apache.flink.streaming.kinesis.test.KinesisExampleTest $TEST_JAR \ + --input-stream test-input --output-stream test-output \ + --aws.endpoint https://localhost:${KINESALITE_PORT} --aws.credentials.provider.basic.secretkey fakekey --aws.credentials.provider.basic.accesskeyid fakeid \ + --flink.stream.initpos TRIM_HORIZON \ + --flink.partition-discovery.interval-millis 1000