[ https://issues.apache.org/jira/browse/FLINK-8975?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16411081#comment-16411081 ]
ASF GitHub Bot commented on FLINK-8975: --------------------------------------- Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/5733#discussion_r176678628 --- Diff: flink-end-to-end-tests/test-scripts/test_resume_savepoint.sh --- @@ -0,0 +1,155 @@ +#!/usr/bin/env bash +################################################################################ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +source "$(dirname "$0")"/common.sh + +# modify configuration to have 2 slots +cp $FLINK_DIR/conf/flink-conf.yaml $FLINK_DIR/conf/flink-conf.yaml.bak +sed -i -e 's/taskmanager.numberOfTaskSlots: 1/taskmanager.numberOfTaskSlots: 2/' $FLINK_DIR/conf/flink-conf.yaml + +# modify configuration to use SLF4J reporter; we will be using this to monitor the state machine progress +cp $FLINK_DIR/opt/flink-metrics-slf4j-1.6-SNAPSHOT.jar $FLINK_DIR/lib/ +echo "metrics.reporter.slf4j.class: org.apache.flink.metrics.slf4j.Slf4jReporter" >> $FLINK_DIR/conf/flink-conf.yaml +echo "metrics.reporter.slf4j.interval: 5 SECONDS" >> $FLINK_DIR/conf/flink-conf.yaml + +start_cluster + +# get Kafka 0.10.0 +mkdir -p $TEST_DATA_DIR +if [ -z "$3" ]; then + # need to download Kafka because no Kafka was specified on the invocation + KAFKA_URL="https://archive.apache.org/dist/kafka/0.10.2.0/kafka_2.11-0.10.2.0.tgz" + echo "Downloading Kafka from $KAFKA_URL" + curl "$KAFKA_URL" > $TEST_DATA_DIR/kafka.tgz +else + echo "Using specified Kafka from $3" + cp $3 $TEST_DATA_DIR/kafka.tgz +fi + +tar xzf $TEST_DATA_DIR/kafka.tgz -C $TEST_DATA_DIR/ +KAFKA_DIR=$TEST_DATA_DIR/kafka_2.11-0.10.2.0 + +# fix kafka config +sed -i -e "s+^\(dataDir\s*=\s*\).*$+\1$TEST_DATA_DIR/zookeeper+" $KAFKA_DIR/config/zookeeper.properties +sed -i -e "s+^\(log\.dirs\s*=\s*\).*$+\1$TEST_DATA_DIR/kafka+" $KAFKA_DIR/config/server.properties +$KAFKA_DIR/bin/zookeeper-server-start.sh -daemon $KAFKA_DIR/config/zookeeper.properties +$KAFKA_DIR/bin/kafka-server-start.sh -daemon $KAFKA_DIR/config/server.properties + +# make sure to stop Kafka and ZooKeeper at the end, as well as cleaning up the Flink cluster and our moodifications +function test_cleanup { + $KAFKA_DIR/bin/kafka-server-stop.sh + $KAFKA_DIR/bin/zookeeper-server-stop.sh + + # revert our modifications to the Flink distribution + rm $FLINK_DIR/conf/flink-conf.yaml + mv $FLINK_DIR/conf/flink-conf.yaml.bak $FLINK_DIR/conf/flink-conf.yaml + rm $FLINK_DIR/lib/flink-metrics-slf4j-1.6-SNAPSHOT.jar + + # make sure to run regular cleanup as well + cleanup +} +trap test_cleanup INT +trap test_cleanup EXIT + +# zookeeper outputs the "Node does not exist" bit to stderr +while [[ $($KAFKA_DIR/bin/zookeeper-shell.sh localhost:2181 get /brokers/ids/0 2>&1) =~ .*Node\ does\ not\ exist.* ]]; do + echo "Waiting for broker..." + sleep 1 +done + +# create the required topic +$KAFKA_DIR/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test-input + +# run the state machine example job +STATE_MACHINE_JOB=$($FLINK_DIR/bin/flink run -d $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $STATE_MACHINE_JOB + +# then, run the events generator +EVENTS_GEN_JOB=$($FLINK_DIR/bin/flink run -d -c org.apache.flink.streaming.examples.statemachine.KafkaEventsGeneratorJob $FLINK_DIR/examples/streaming/StateMachineExample.jar \ + --kafka-topic test-input --sleep 30 \ + | grep "Job has been submitted with JobID" | sed 's/.* //g') + +wait_job_running $EVENTS_GEN_JOB + +function get_metric_state_machine_processed_records { + grep ".State machine job.Flat Map -> Sink: Print to Std. Out.0.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | sed 's/.* //g' | tail -1 +} + +function get_num_metric_samples { + grep ".State machine job.Flat Map -> Sink: Print to Std. Out.0.numRecordsIn:" $FLINK_DIR/log/*taskexecutor*.log | wc -l +} + +# monitor the numRecordsIn metric of the state machine operator; +# only proceed to savepoint when the operator has processed 200 records +while : ; do + NUM_RECORDS=$(get_metric_state_machine_processed_records) + + if [ -z $NUM_RECORDS ]; then + NUM_RECORDS=0 + fi + + if (( $NUM_RECORDS < 200 )); then + echo "Waiting for state machine job to process up to 200 records, current progress: $NUM_RECORDS records ..." + sleep 5 + else + break + fi +done + +# take a savepoint of the state machine job +SAVEPOINT_PATH=$(take_savepoint $STATE_MACHINE_JOB $TEST_DATA_DIR \ + | grep "Savepoint completed. Path:" | sed 's/.* //g') + +cancel_job $STATE_MACHINE_JOB + +# Since it is not possible to differentiate reporter output between the first and second exuection, --- End diff -- typo: execution > End-to-end test: Resume from savepoint > -------------------------------------- > > Key: FLINK-8975 > URL: https://issues.apache.org/jira/browse/FLINK-8975 > Project: Flink > Issue Type: Sub-task > Components: Tests > Affects Versions: 1.5.0 > Reporter: Till Rohrmann > Assignee: Tzu-Li (Gordon) Tai > Priority: Blocker > Fix For: 1.5.0 > > > User usually take a savepoint and want to resume from it. In order to verify > that Flink supports this feature, we should add an end-to-end test which > scripts this behavior. We should use the general purpose testing job > FLINK-8971 with failures disabled for that. > The end-to-end test should do the following: > * Submit FLINK-8971 job > * Verify that the savepoint is there > * Cancel job and resume from savepoint > * Verify that job could be resumed > * Use different StateBackends: RocksDB incremental async/sync, RocksDB full > async/sync, FsStateBackend aysnc/sync -- This message was sent by Atlassian JIRA (v7.6.3#76005)