This is an automated email from the ASF dual-hosted git repository.

thw pushed a commit to branch release-1.8
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.8 by this push:
     new 11af452  [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test
11af452 is described below

commit 11af4523801164539e186d836462f5884b561941
Author: Thomas Weise <t...@apache.org>
AuthorDate: Thu Feb 28 16:11:50 2019 -0800

    [FLINK-9007] [kinesis] [e2e] Add Kinesis end-to-end test
---
 .../flink-streaming-kinesis-test/pom.xml           |  92 +++++++++++++++
 .../streaming/kinesis/test/KinesisExample.java     |  91 +++++++++++++++
 .../streaming/kinesis/test/KinesisExampleTest.java | 127 ++++++++++++++++++++
 .../kinesis/test/KinesisPubsubClient.java          | 128 +++++++++++++++++++++
 flink-end-to-end-tests/pom.xml                     |  21 ++++
 flink-end-to-end-tests/run-pre-commit-tests.sh     |   1 +
 flink-end-to-end-tests/test-scripts/common.sh      |   2 +-
 .../test-scripts/test_streaming_kinesis.sh         |  63 ++++++++++
 8 files changed, 524 insertions(+), 1 deletion(-)

diff --git a/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml 
b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
new file mode 100644
index 0000000..2774964
--- /dev/null
+++ b/flink-end-to-end-tests/flink-streaming-kinesis-test/pom.xml
@@ -0,0 +1,92 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one
+or more contributor license agreements.  See the NOTICE file
+distributed with this work for additional information
+regarding copyright ownership.  The ASF licenses this file
+to you under the Apache License, Version 2.0 (the
+"License"); you may not use this file except in compliance
+with the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing,
+software distributed under the License is distributed on an
+"AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+KIND, either express or implied.  See the License for the
+specific language governing permissions and limitations
+under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+                xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+                xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+       <parent>
+               <artifactId>flink-end-to-end-tests</artifactId>
+               <groupId>org.apache.flink</groupId>
+               <version>1.8-SNAPSHOT</version>
+               <relativePath>..</relativePath>
+       </parent>
+       <modelVersion>4.0.0</modelVersion>
+
+       
<artifactId>flink-streaming-kinesis-test_${scala.binary.version}</artifactId>
+       <name>flink-streaming-kinesis-test</name>
+       <packaging>jar</packaging>
+
+       <dependencies>
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-kafka-test-base_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-connector-kinesis_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>org.apache.flink</groupId>
+                       
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+                       <version>${project.version}</version>
+               </dependency>
+
+               <dependency>
+                       <groupId>junit</groupId>
+                       <artifactId>junit</artifactId>
+                       <version>${junit.version}</version>
+                       <scope>compile</scope>
+               </dependency>
+       </dependencies>
+
+       <build>
+               <plugins>
+                       <!-- Use the shade plugin to build a fat jar for the 
Kinesis connector test -->
+                       <plugin>
+                               <groupId>org.apache.maven.plugins</groupId>
+                               <artifactId>maven-shade-plugin</artifactId>
+                               <executions>
+                                       <execution>
+                                               <id>fat-jar-kinesis-example</id>
+                                               <phase>package</phase>
+                                               <goals>
+                                                       <goal>shade</goal>
+                                               </goals>
+                                               <configuration>
+                                                       
<shadeTestJar>false</shadeTestJar>
+                                                       
<shadedArtifactAttached>false</shadedArtifactAttached>
+                                                       
<createDependencyReducedPom>false</createDependencyReducedPom>
+                                                       <transformers>
+                                                               <transformer 
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                                                       
<mainClass>org.apache.flink.streaming.kinesis.test.KinesisExample</mainClass>
+                                                               </transformer>
+                                                       </transformers>
+                                                       
<finalName>KinesisExample</finalName>
+                                               </configuration>
+                                       </execution>
+                               </executions>
+                       </plugin>
+               </plugins>
+       </build>
+
+</project>
diff --git 
a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java
 
b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java
new file mode 100644
index 0000000..4957c35
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExample.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.kinesis.test;
+
+import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
+import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.kafka.test.base.CustomWatermarkExtractor;
+import org.apache.flink.streaming.kafka.test.base.KafkaEvent;
+import org.apache.flink.streaming.kafka.test.base.KafkaEventSchema;
+import org.apache.flink.streaming.kafka.test.base.KafkaExampleUtil;
+import org.apache.flink.streaming.kafka.test.base.RollingAdditionMapper;
+
+import java.net.URL;
+import java.util.Properties;
+
+/**
+ * A simple example that shows how to read from and write to Kinesis. This 
will read String messages
+ * from the input topic, parse them into a POJO type {@link KafkaEvent}, group 
by some key, and finally
+ * perform a rolling addition on each key for which the results are written 
back to another topic.
+ *
+ * <p>This example also demonstrates using a watermark assigner to generate 
per-partition
+ * watermarks directly in the Flink Kinesis consumer. For demonstration 
purposes, it is assumed that
+ * the String messages formatted as a (word,frequency,timestamp) tuple.
+ *
+ * <p>Example usage:
+ *     --input-stream test-input --output-stream test-output --aws.endpoint 
https://localhost:4567 --flink.stream.initpos TRIM_HORIZON
+ */
+public class KinesisExample {
+       public static void main(String[] args) throws Exception {
+               // parse input arguments
+               final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
+               StreamExecutionEnvironment env = 
KafkaExampleUtil.prepareExecutionEnv(parameterTool);
+
+               String inputStream = parameterTool.getRequired("input-stream");
+               String outputStream = 
parameterTool.getRequired("output-stream");
+
+               FlinkKinesisConsumer<KafkaEvent> consumer = new 
FlinkKinesisConsumer<>(
+                       inputStream,
+                       new KafkaEventSchema(),
+                       parameterTool.getProperties());
+               consumer.setPeriodicWatermarkAssigner(new 
CustomWatermarkExtractor());
+
+               Properties producerProperties = new 
Properties(parameterTool.getProperties());
+               // producer needs region even when URL is specified
+               
producerProperties.putIfAbsent(ConsumerConfigConstants.AWS_REGION, "us-east-1");
+               // test driver does not deaggregate
+               producerProperties.putIfAbsent("AggregationEnabled", 
String.valueOf(false));
+
+               // KPL does not recognize endpoint URL..
+               String kinesisUrl = 
producerProperties.getProperty(ConsumerConfigConstants.AWS_ENDPOINT);
+               if (kinesisUrl != null) {
+                       URL url = new URL(kinesisUrl);
+                       producerProperties.put("KinesisEndpoint", 
url.getHost());
+                       producerProperties.put("KinesisPort", 
Integer.toString(url.getPort()));
+                       producerProperties.put("VerifyCertificate", "false");
+               }
+
+               FlinkKinesisProducer<KafkaEvent> producer = new 
FlinkKinesisProducer<>(
+                       new KafkaEventSchema(),
+                       producerProperties);
+               producer.setDefaultStream(outputStream);
+               producer.setDefaultPartition("fakePartition");
+
+               DataStream<KafkaEvent> input = env
+                       .addSource(consumer)
+                       .keyBy("word")
+                       .map(new RollingAdditionMapper());
+
+               input.addSink(producer);
+               env.execute();
+       }
+}
diff --git 
a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java
 
b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java
new file mode 100644
index 0000000..1a6d6d7
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisExampleTest.java
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.kinesis.test;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.api.java.utils.ParameterTool;
+
+import org.junit.Assert;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.atomic.AtomicReference;
+
+/**
+ * Test driver for {@link KinesisExample#main}.
+ */
+public class KinesisExampleTest {
+       private static final Logger LOG = 
LoggerFactory.getLogger(KinesisExampleTest.class);
+
+       /**
+        * Interface to the pubsub system for this test.
+        */
+       interface PubsubClient {
+               void createTopic(String topic, int partitions, Properties 
props) throws Exception;
+
+               void sendMessage(String topic, String msg);
+
+               List<String> readAllMessages(String streamName) throws 
Exception;
+       }
+
+       public static void main(String[] args) throws Exception {
+               LOG.info("System properties: {}", System.getProperties());
+               final ParameterTool parameterTool = 
ParameterTool.fromArgs(args);
+
+               String inputStream = parameterTool.getRequired("input-stream");
+               String outputStream = 
parameterTool.getRequired("output-stream");
+
+               PubsubClient pubsub = new 
KinesisPubsubClient(parameterTool.getProperties());
+               pubsub.createTopic(inputStream, 2, 
parameterTool.getProperties());
+               pubsub.createTopic(outputStream, 2, 
parameterTool.getProperties());
+
+               // The example job needs to start after streams are created and 
run in parallel to the validation logic.
+               // The thread that runs the job won't terminate, we don't have 
a job reference to cancel it.
+               // Once results are validated, the driver main thread will 
exit; job/cluster will be terminated from script.
+               final AtomicReference<Exception> executeException = new 
AtomicReference<>();
+               Thread executeThread =
+                       new Thread(
+                               () -> {
+                                       try {
+                                               KinesisExample.main(args);
+                                               // this message won't appear in 
the log,
+                                               // job is terminated when 
shutting down cluster
+                                               LOG.info("executed program");
+                                       } catch (Exception e) {
+                                               executeException.set(e);
+                                       }
+                               });
+               executeThread.start();
+
+               // generate input
+               String[] messages = {
+                       "elephant,5,45218",
+                       "squirrel,12,46213",
+                       "bee,3,51348",
+                       "squirrel,22,52444",
+                       "bee,10,53412",
+                       "elephant,9,54867"
+               };
+               for (String msg : messages) {
+                       pubsub.sendMessage(inputStream, msg);
+               }
+               LOG.info("generated records");
+
+               Deadline deadline  = Deadline.fromNow(Duration.ofSeconds(60));
+               List<String> results = pubsub.readAllMessages(outputStream);
+               while (deadline.hasTimeLeft() && executeException.get() == null 
&& results.size() < messages.length) {
+                       LOG.info("waiting for results..");
+                       Thread.sleep(1000);
+                       results = pubsub.readAllMessages(outputStream);
+               }
+
+               if (executeException.get() != null) {
+                       throw executeException.get();
+               }
+
+               LOG.info("results: {}", results);
+               Assert.assertEquals("Results received from '" + outputStream + 
"': " + results,
+                       messages.length, results.size());
+
+               String[] expectedResults = {
+                       "elephant,5,45218",
+                       "elephant,14,54867",
+                       "squirrel,12,46213",
+                       "squirrel,34,52444",
+                       "bee,3,51348",
+                       "bee,13,53412"
+               };
+
+               for (String expectedResult : expectedResults) {
+                       Assert.assertTrue(expectedResult, 
results.contains(expectedResult));
+               }
+
+               // TODO: main thread needs to create job or CLI fails with:
+               // "The program didn't contain a Flink job. Perhaps you forgot 
to call execute() on the execution environment."
+               System.out.println("test finished");
+               System.exit(0);
+       }
+
+}
diff --git 
a/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisPubsubClient.java
 
b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisPubsubClient.java
new file mode 100644
index 0000000..486b565
--- /dev/null
+++ 
b/flink-end-to-end-tests/flink-streaming-kinesis-test/src/main/java/org/apache/flink/streaming/kinesis/test/KinesisPubsubClient.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.kinesis.test;
+
+import org.apache.flink.api.common.time.Deadline;
+import org.apache.flink.kinesis.shaded.com.amazonaws.AmazonClientException;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.auth.AWSCredentialsProvider;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.auth.EnvironmentVariableCredentialsProvider;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.client.builder.AwsClientBuilder;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesis;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.AmazonKinesisClientBuilder;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.GetRecordsResult;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.PutRecordRequest;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.PutRecordResult;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.Record;
+import 
org.apache.flink.kinesis.shaded.com.amazonaws.services.kinesis.model.ResourceNotFoundException;
+import 
org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;
+import org.apache.flink.streaming.connectors.kinesis.model.StreamShardHandle;
+import org.apache.flink.streaming.connectors.kinesis.proxy.GetShardListResult;
+import org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxy;
+import 
org.apache.flink.streaming.connectors.kinesis.proxy.KinesisProxyInterface;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+
+class KinesisPubsubClient implements KinesisExampleTest.PubsubClient {
+       private static final Logger LOG = 
LoggerFactory.getLogger(KinesisPubsubClient.class);
+
+       private final AmazonKinesis kinesisClient;
+       private final Properties properties;
+
+       KinesisPubsubClient(Properties properties) {
+               this.kinesisClient = createClientWithCredentials(properties);
+               this.properties = properties;
+       }
+
+       @Override
+       public void createTopic(String stream, int shards, Properties props) 
throws Exception {
+               try {
+                       kinesisClient.describeStream(stream);
+                       kinesisClient.deleteStream(stream);
+               } catch (ResourceNotFoundException rnfe) {
+                       // expected when stream doesn't exist
+               }
+
+               kinesisClient.createStream(stream, shards);
+               Deadline deadline  = Deadline.fromNow(Duration.ofSeconds(5));
+               while (deadline.hasTimeLeft()) {
+                       try {
+                               Thread.sleep(250); // sleep for a bit for 
stream to be created
+                               if 
(kinesisClient.describeStream(stream).getStreamDescription()
+                                       .getShards().size() != shards) {
+                                       // not fully created yet
+                                       continue;
+                               }
+                               break;
+                       } catch (ResourceNotFoundException rnfe) {
+                               // not ready yet
+                       }
+               }
+       }
+
+       @Override
+       public void sendMessage(String topic, String msg) {
+               PutRecordRequest putRecordRequest = new PutRecordRequest();
+               putRecordRequest.setStreamName(topic);
+               putRecordRequest.setPartitionKey("fakePartitionKey");
+               putRecordRequest.withData(ByteBuffer.wrap(msg.getBytes()));
+               PutRecordResult putRecordResult = 
kinesisClient.putRecord(putRecordRequest);
+               LOG.info("added record: {}", 
putRecordResult.getSequenceNumber());
+       }
+
+       @Override
+       public List<String> readAllMessages(String streamName) throws Exception 
{
+               KinesisProxyInterface kinesisProxy = 
KinesisProxy.create(properties);
+               Map<String, String> streamNamesWithLastSeenShardIds = new 
HashMap<>();
+               streamNamesWithLastSeenShardIds.put(streamName, null);
+
+               GetShardListResult shardListResult = 
kinesisProxy.getShardList(streamNamesWithLastSeenShardIds);
+               int maxRecordsToFetch = 10;
+
+               List<String> messages = new ArrayList<>();
+               // retrieve records from all shards
+               for (StreamShardHandle ssh : 
shardListResult.getRetrievedShardListOfStream(streamName)) {
+                       String shardIterator = 
kinesisProxy.getShardIterator(ssh, "TRIM_HORIZON", null);
+                       GetRecordsResult getRecordsResult = 
kinesisProxy.getRecords(shardIterator, maxRecordsToFetch);
+                       List<Record> aggregatedRecords = 
getRecordsResult.getRecords();
+                       for (Record record : aggregatedRecords) {
+                               messages.add(new 
String(record.getData().array()));
+                       }
+               }
+               return messages;
+       }
+
+       private static AmazonKinesis createClientWithCredentials(Properties 
props) throws AmazonClientException {
+               AWSCredentialsProvider credentialsProvider = new 
EnvironmentVariableCredentialsProvider();
+               return AmazonKinesisClientBuilder.standard()
+                       .withCredentials(credentialsProvider)
+                       .withEndpointConfiguration(
+                               new AwsClientBuilder.EndpointConfiguration(
+                                       
props.getProperty(ConsumerConfigConstants.AWS_ENDPOINT), "us-east-1"))
+                       .build();
+       }
+
+}
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index f6c36dc..b50ca6b 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -68,6 +68,27 @@ under the License.
                <module>flink-streaming-kafka010-test</module>
        </modules>
 
+       <!-- See main pom.xml for explanation of profiles -->
+       <profiles>
+               <!--
+                       We include the kinesis module only optionally because 
it contains a dependency
+                       licenced under the "Amazon Software License".
+                       In accordance with the discussion in 
https://issues.apache.org/jira/browse/LEGAL-198
+                       this is an optional module for Flink.
+               -->
+               <profile>
+                       <id>include-kinesis</id>
+                       <activation>
+                               <property>
+                                       <name>include-kinesis</name>
+                               </property>
+                       </activation>
+                       <modules>
+                               <module>flink-streaming-kinesis-test</module>
+                       </modules>
+               </profile>
+       </profiles>
+
        <build>
                <plugins>
                        <plugin>
diff --git a/flink-end-to-end-tests/run-pre-commit-tests.sh 
b/flink-end-to-end-tests/run-pre-commit-tests.sh
index 4a9a1a0..f40fd56 100755
--- a/flink-end-to-end-tests/run-pre-commit-tests.sh
+++ b/flink-end-to-end-tests/run-pre-commit-tests.sh
@@ -56,6 +56,7 @@ run_test "Wordcount end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_batch_wo
 run_test "Kafka 0.10 end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_kafka010.sh"
 run_test "Kafka 0.11 end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_kafka011.sh"
 run_test "Modern Kafka end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_kafka.sh"
+run_test "Kinesis end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_kinesis.sh"
 run_test "class loading end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_classloader.sh"
 run_test "Shaded Hadoop S3A end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_shaded_hadoop_s3a.sh"
 run_test "Shaded Presto S3 end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_shaded_presto_s3.sh"
diff --git a/flink-end-to-end-tests/test-scripts/common.sh 
b/flink-end-to-end-tests/test-scripts/common.sh
index c06249e..1a54aba 100644
--- a/flink-end-to-end-tests/test-scripts/common.sh
+++ b/flink-end-to-end-tests/test-scripts/common.sh
@@ -47,7 +47,7 @@ cd $TEST_INFRA_DIR
 TEST_INFRA_DIR=`pwd -P`
 cd $TEST_ROOT
 
-NODENAME=`hostname -f`
+NODENAME=${NODENAME:-`hostname -f`}
 
 # REST_PROTOCOL and CURL_SSL_ARGS can be modified in common_ssl.sh if SSL is 
activated
 # they should be used in curl command to query Flink REST API
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kinesis.sh 
b/flink-end-to-end-tests/test-scripts/test_streaming_kinesis.sh
new file mode 100755
index 0000000..1981b13
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_kinesis.sh
@@ -0,0 +1,63 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+source "$(dirname "$0")"/common.sh
+
+# Kinesalite doesn't support CBOR
+export AWS_CBOR_DISABLE=1
+
+# Required by the KPL native process
+export AWS_ACCESS_KEY_ID=flinkKinesisTestFakeAccessKeyId
+export AWS_SECRET_KEY=flinkKinesisTestFakeAccessKey
+
+KINESALITE_PORT=4567
+
+#docker run -d --rm --name flink-test-kinesis -p 
${KINESALITE_PORT}:${KINESALITE_PORT} instructure/kinesalite
+# override entrypoint to enable SSL
+docker run -d --rm --entrypoint "/tini" --name flink-test-kinesis -p 
${KINESALITE_PORT}:${KINESALITE_PORT} instructure/kinesalite -- 
/usr/src/app/node_modules/kinesalite/cli.js --path /var/lib/kinesalite --ssl
+
+# reveal potential issues with the container in the CI environment
+docker logs flink-test-kinesis
+
+function test_cleanup {
+  # don't call ourselves again for another signal interruption
+  trap "exit -1" INT
+  # don't call ourselves again for normal exit
+  trap "" EXIT
+  # job needs to stop before kinesalite
+  stop_cluster
+  echo "terminating kinesalite"
+  docker kill flink-test-kinesis
+}
+trap test_cleanup INT
+trap test_cleanup EXIT
+
+# prefix com.amazonaws.sdk.disableCertChecking to account for shading
+DISABLE_CERT_CHECKING_JAVA_OPTS="-Dorg.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCertChecking"
+
+export FLINK_ENV_JAVA_OPTS=${DISABLE_CERT_CHECKING_JAVA_OPTS}
+start_cluster
+
+TEST_JAR="${END_TO_END_DIR}/flink-streaming-kinesis-test/target/KinesisExample.jar"
+JVM_ARGS=${DISABLE_CERT_CHECKING_JAVA_OPTS} \
+$FLINK_DIR/bin/flink run -p 1 -c 
org.apache.flink.streaming.kinesis.test.KinesisExampleTest $TEST_JAR \
+  --input-stream test-input --output-stream test-output \
+  --aws.endpoint https://localhost:${KINESALITE_PORT} 
--aws.credentials.provider.basic.secretkey fakekey 
--aws.credentials.provider.basic.accesskeyid fakeid \
+  --flink.stream.initpos TRIM_HORIZON \
+  --flink.partition-discovery.interval-millis 1000

Reply via email to