Repository: flink
Updated Branches:
  refs/heads/master aa88a425b -> 6d0d366eb


[FLINK-8979] [test] Refactor Kafka-related common end-to-end test scripts


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0821f491
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0821f491
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0821f491

Branch: refs/heads/master
Commit: 0821f4918fc06249d2dea1fa4ef9b2a034b5a83b
Parents: aa88a42
Author: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Authored: Tue Mar 27 18:06:19 2018 +0800
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Tue Apr 3 15:37:04 2018 +0800

----------------------------------------------------------------------
 .../test-scripts/kafka-common.sh                | 74 ++++++++++++++++++++
 .../test-scripts/test_resume_savepoint.sh       | 34 ++-------
 .../test-scripts/test_streaming_kafka010.sh     | 51 ++++----------
 3 files changed, 91 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/0821f491/flink-end-to-end-tests/test-scripts/kafka-common.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/kafka-common.sh 
b/flink-end-to-end-tests/test-scripts/kafka-common.sh
new file mode 100644
index 0000000..7f05357
--- /dev/null
+++ b/flink-end-to-end-tests/test-scripts/kafka-common.sh
@@ -0,0 +1,74 @@
+#!/usr/bin/env bash
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+set -o pipefail
+
+if [[ -z $TEST_DATA_DIR ]]; then
+  echo "Must run common.sh before kafka-common.sh."
+  exit 1
+fi
+
+KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+
+function setup_kafka_dist {
+  # download Kafka
+  mkdir -p $TEST_DATA_DIR
+  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
+  echo "Downloading Kafka from $KAFKA_URL"
+  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
+
+  tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
+
+  # fix kafka config
+  sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
+  sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
+}
+
+function start_kafka_cluster {
+  if [[ -z $KAFKA_DIR ]]; then
+    echo "Must run 'setup_kafka_dist' before attempting to start Kafka cluster"
+    exit 1
+  fi
+
+  $KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
+  $KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+
+  # zookeeper outputs the "Node does not exist" bit to stderr
+  while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get 
/brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
+    echo "Waiting for broker..."
+    sleep 1
+  done
+}
+
+function stop_kafka_cluster {
+  $KAFKA_DIR/bin/kafka-server-stop.sh
+  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+}
+
+function create_kafka_topic {
+  $KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor $1 --partitions $2 --topic $3
+}
+
+function send_messages_to_kafka {
+  echo -e $1 | $KAFKA_DIR/bin/kafka-console-producer.sh --broker-list 
localhost:9092 --topic $2
+}
+
+function read_messages_from_kafka {
+  $KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 
--topic $2 --from-beginning --max-messages $1 2> /dev/null
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/0821f491/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh 
b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
index 83e0e5a..6642ad5 100755
--- a/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
+++ b/flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh
@@ -23,27 +23,10 @@ if [ -z $1 ] || [ -z $2 ]; then
 fi
 
 source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/kafka-common.sh
 
-# get Kafka 0.10.0
-mkdir -p $TEST_DATA_DIR
-if [ -z "$3" ]; then
-  # need to download Kafka because no Kafka was specified on the invocation
-  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
-  echo "Downloading Kafka from $KAFKA_URL"
-  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
-else
-  echo "Using specified Kafka from $3"
-  cp $3 $TEST_DATA_DIR/kafka.tgz
-fi
-
-tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
-KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
-
-# fix kafka config
-sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
-sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
-$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
-$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
+setup_kafka_dist
+start_kafka_cluster
 
 ORIGINAL_DOP=$1
 NEW_DOP=$2
@@ -67,8 +50,7 @@ start_cluster
 
 # make sure to stop Kafka and ZooKeeper at the end, as well as cleaning up the 
Flink cluster and our moodifications
 function test_cleanup {
-  $KAFKA_DIR/bin/kafka-server-stop.sh
-  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+  stop_kafka_cluster
 
   # revert our modifications to the Flink distribution
   rm $FLINK_DIR/conf/flink-conf.yaml
@@ -81,14 +63,8 @@ function test_cleanup {
 trap test_cleanup INT
 trap test_cleanup EXIT
 
-# zookeeper outputs the "Node does not exist" bit to stderr
-while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 
2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
-  echo "Waiting for broker..."
-  sleep 1
-done
-
 # create the required topic
-$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
+create_kafka_topic 1 1 test-input
 
 # run the state machine example job
 STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d -p $ORIGINAL_DOP 
$FLINK_DIR/examples/streaming/StateMachineExample.jar \

http://git-wip-us.apache.org/repos/asf/flink/blob/0821f491/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
----------------------------------------------------------------------
diff --git a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh 
b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
index a6a9a8e..e09be35 100755
--- a/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_kafka010.sh
@@ -18,51 +18,25 @@
 
################################################################################
 
 source "$(dirname "$0")"/common.sh
+source "$(dirname "$0")"/kafka-common.sh
 
-start_cluster
-
-# get Kafka 0.10.0
-mkdir -p $TEST_DATA_DIR
-if [ -z "$3" ]; then
-  # need to download Kafka because no Kafka was specified on the invocation
-  
KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz";
-  echo "Downloading Kafka from $KAFKA_URL"
-  curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz
-else
-  echo "Using specified Kafka from $3"
-  cp $3 $TEST_DATA_DIR/kafka.tgz
-fi
-
-tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/
-KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0
+setup_kafka_dist
+start_kafka_cluster
 
-# fix kafka config
-sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" 
$KAFKA_DIR/config/zookeeper.properties
-sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" 
$KAFKA_DIR/config/server.properties
-$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon 
$KAFKA_DIR/config/zookeeper.properties
-$KAFKA_DIR/bin/kafka-server-start.sh -daemon 
$KAFKA_DIR/config/server.properties
-
-# make sure to stop Kafka and ZooKeeper at the end
+start_cluster
 
-function kafka_cleanup {
-  $KAFKA_DIR/bin/kafka-server-stop.sh
-  $KAFKA_DIR/bin/zookeeper-server-stop.sh
+function test_cleanup {
+  stop_kafka_cluster
 
   # make sure to run regular cleanup as well
   cleanup
 }
-trap kafka_cleanup INT
-trap kafka_cleanup EXIT
-
-# zookeeper outputs the "Node does not exist" bit to stderr
-while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 
2>&1) =~ .*Node\ does\ not\ exist.* ]]; do
-  echo "Waiting for broker..."
-  sleep 1
-done
+trap test_cleanup INT
+trap test_cleanup EXIT
 
 # create the required topics
-$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-input
-$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 
--replication-factor 1 --partitions 1 --topic test-output
+create_kafka_topic 1 1 test-input
+create_kafka_topic 1 1 test-output
 
 # run the Flink job (detached mode)
 $FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/Kafka010Example.jar \
@@ -71,9 +45,8 @@ $FLINK_DIR/bin/flink run -d 
$FLINK_DIR/examples/streaming/Kafka010Example.jar \
   --bootstrap.servers localhost:9092 --zookeeper.connect localhost:2181 
--group.id myconsumer --auto.offset.reset earliest
 
 # send some data to Kafka
-echo -e "hello,45218\nwhats,46213\nup,51348" | 
$KAFKA_DIR/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic 
test-input
-
-DATA_FROM_KAFKA=$($KAFKA_DIR/bin/kafka-console-consumer.sh --bootstrap-server 
localhost:9092 --topic test-output --from-beginning --max-messages 3 2> 
/dev/null)
+send_messages_to_kafka "hello,45218\nwhats,46213\nup,51348" test-input
+DATA_FROM_KAFKA=$(read_messages_from_kafka 3 test-output)
 
 # make sure we have actual newlines in the string, not "\n"
 EXPECTED=$(printf "PREFIX:hello,45218\nPREFIX:whats,46213\nPREFIX:up,51348")

Reply via email to