[GitHub] flink pull request #6419: [FLINK-9949][tests] Kill Flink processes in DB/tea...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/6419 [FLINK-9949][tests] Kill Flink processes in DB/teardown ## What is the purpose of the change *Not killing Flink processes at the end of a test, can cause interference with subsequent test runs.* ## Brief change log - *Kill Flink processes in `DB/teardown!`.* ## Verifying this change This change added tests and can be verified as follows: - *Ran tests in docker.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) cc: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-9949 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6419.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6419 commit db3cecb5c9fb16d707e02b436244ba8fd5ee1ce8 Author: gyao Date: 2018-07-25T13:28:40Z [FLINK-9949][tests] Kill Flink processes in DB/teardown ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205110205 --- Diff: flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java --- @@ -66,14 +61,21 @@ public static void main(String[] args) throws Exception { ParameterTool params = ParameterTool.fromArgs(args); int loadFactor = Integer.parseInt(params.getRequired("loadFactor")); String outputPath = params.getRequired("outputPath"); + boolean infinite = params.getBoolean("infinite", false); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); int numKeys = loadFactor * 128 * 1024; - DataSet> x1Keys = env.createInput(new Generator(numKeys, 1)).setParallelism(4); + DataSet> x1Keys; DataSet> x2Keys = env.createInput(new Generator(numKeys * 32, 2)).setParallelism(4); DataSet> x8Keys = env.createInput(new Generator(numKeys, 8)).setParallelism(4); + if (infinite) { + x1Keys = env.createInput(Generator.infinite()).setParallelism(4).filter(t -> t.f1 >= 0); --- End diff -- Yes, I didn't want to pass elements from infinite source down the graph, so not to reach some OOMs because of the infinite nature of it. That's why I am filtering out those elements. Do you think we should pass them anyway? ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205106925 --- Diff: flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java --- @@ -66,14 +61,21 @@ public static void main(String[] args) throws Exception { ParameterTool params = ParameterTool.fromArgs(args); int loadFactor = Integer.parseInt(params.getRequired("loadFactor")); String outputPath = params.getRequired("outputPath"); + boolean infinite = params.getBoolean("infinite", false); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); int numKeys = loadFactor * 128 * 1024; - DataSet> x1Keys = env.createInput(new Generator(numKeys, 1)).setParallelism(4); + DataSet> x1Keys; DataSet> x2Keys = env.createInput(new Generator(numKeys * 32, 2)).setParallelism(4); DataSet> x8Keys = env.createInput(new Generator(numKeys, 8)).setParallelism(4); + if (infinite) { + x1Keys = env.createInput(Generator.infinite()).setParallelism(4).filter(t -> t.f1 >= 0); --- End diff -- This now filters out all data. ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205108213 --- Diff: flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/Generator.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.tests; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * InputFormat that generates a deterministic DataSet of Tuple2(String, Integer) + * + * String: key, can be repeated. + * Integer: uniformly distributed int between 0 and 127 + * + * + * If control path was provided, as long as this file is empty dummy elements with value equal to -1 will be emitted. --- End diff -- outdated ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205108561 --- Diff: flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/Generator.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.tests; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * InputFormat that generates a deterministic DataSet of Tuple2(String, Integer) + * + * String: key, can be repeated. + * Integer: uniformly distributed int between 0 and 127 + * + * + * If control path was provided, as long as this file is empty dummy elements with value equal to -1 will be emitted. + */ +public class Generator implements InputFormat, GenericInputSplit> { + + // total number of records + private final long numRecords; + // total number of keys + private final long numKeys; + + // records emitted per partition + private long recordsPerPartition; + // number of keys per partition + private long keysPerPartition; + + // number of currently emitted records + private long recordCnt; + + // id of current partition + private int partitionId; + + private final boolean infinite; + + public Generator(long numKeys, int recordsPerKey) { + this(numKeys, recordsPerKey, false); + } + + private Generator(long numKeys, int recordsPerKey, boolean infinite) { + this.numKeys = numKeys; + this.numRecords = numKeys * recordsPerKey; + this.infinite = infinite; + } + + public static Generator infinite() { + return new Generator(Long.MAX_VALUE, 1, true); + } + + @Override + public void configure(Configuration parameters) { } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { + return null; + } + + @Override + public GenericInputSplit[] createInputSplits(int minNumSplits) { + + GenericInputSplit[] splits = new GenericInputSplit[minNumSplits]; + for (int i = 0; i < minNumSplits; i++) { + splits[i] = new GenericInputSplit(i, minNumSplits); + } + return splits; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(GenericInputSplit[] inputSplits) { + return new DefaultInputSplitAssigner(inputSplits); + } + + @Override + public void open(GenericInputSplit split) throws IOException { + this.partitionId = split.getSplitNumber(); + // total number of partitions + int numPartitions = split.getTotalNumberOfSplits(); + + // ensure even distribution of records and keys + Preconditions.checkArgument( + numRecords % numPartitions == 0, + "Records cannot be evenly distributed among partitions"); + Preconditions.checkArgument( + numKeys % numPartitions == 0, + "Keys cannot be evenly distributed among partitions"); + + this.recordsPerPartition = numRecords / numPartitions; + this.keysPerPartition = numKeys / numPartitions; + + this.recordCnt = 0; + } + + @Override + public boolean reachedEnd() { + return this.recordCnt >= this.recordsPerP
[GitHub] flink pull request #6396: [FLINK-9806][docs] Add canonical link element to d...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6396 ---
[GitHub] flink pull request #6407: [FLINK-9877][docs] Add documentation page for diff...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6407#discussion_r205097411 --- Diff: docs/dev/stream/operators/joining.md --- @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. + +The joined elements are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where the user can perform transformations on the joined elements. + +The general usage always looks like the followning: + +```java +stream.join(otherStream) +.where() +.equalTo() +.window() +.apply() +``` + +Some notes on semantics: +- The creation of pairwise combinations of elements of the two streams behaves like an inner-join, meaning elements from one stream will not be emitted if they don't have a corresponding element from the other stream to be joined with. +- Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with `[5, 10)` as its boundaries would result in the joined elements having nine as their timestamp. + +In the following section we are going to give an overview over how different kinds of windows can be used for a window join and what the results of those joins would look like using examplary scenarios. + +## Tumbling Window +When performing a tumbling window join, all elements with a common key and a common tumbling window are joined as pairwise combinations and passed on to the user-defined function. Because this behaves like an inner join, elements of one stream that do not have elements from another stream in their tumbling window are not emitted! + +### Example + + +In our example we are defining a tumbling window with the size of 2 milliseconds, which results in windows of the form `[0,1], [2,3], ...`. The image shows the pairwise combinations of all elements in each window which will be passed on to the user-defined function. You can also see how in the tumbling window `[6,7]` nothing is emitted because no elements from the green stream exist to be joined with the orange elements ⥠and â¦. + + + + +```java --- End diff -- please use { % highlight java % } syntax ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205087147 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { +if [ ${CLEARED} -eq 0 ]; then + +if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then +echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}" +kill ${JM_WATCHDOG_PID} 2> /dev/null +wait ${JM_WATCHDOG_PID} 2> /dev/null +fi + +CLEARED=1 +fi +} + +function verify_logs() { +local OUTPUT=$FLINK_DIR/log/*.out +local JM_FAILURES=$1 +local EXIT_CODE=0 + +# verify that we have no alerts +if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then +echo "FAILURE: Alerts found at the general purpose DataSet job." +EXIT_CODE=1 +fi + +# checks that all apart from the first JM recover the failed jobgraph. +if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then +echo "FAILURE: A JM did not take over." +EXIT_CODE=1 +fi + +if [[ $EXIT_CODE != 0 ]]; then +echo "One or more tests FAILED." +exit $EXIT_CODE +fi +} + +function jm_watchdog() { +local EXPECTED_JMS=$1 +local IP_PORT=$2 + +while true; do +local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`; +local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS)) +for (( c=0; c> ${TEST_DATA_DIR}/control/test.txt --- End diff -- An infinite flag should be sufficient. ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205084257 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { +if [ ${CLEARED} -eq 0 ]; then + +if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then +echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}" +kill ${JM_WATCHDOG_PID} 2> /dev/null +wait ${JM_WATCHDOG_PID} 2> /dev/null +fi + +CLEARED=1 +fi +} + +function verify_logs() { +local OUTPUT=$FLINK_DIR/log/*.out +local JM_FAILURES=$1 +local EXIT_CODE=0 + +# verify that we have no alerts +if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then +echo "FAILURE: Alerts found at the general purpose DataSet job." +EXIT_CODE=1 +fi + +# checks that all apart from the first JM recover the failed jobgraph. +if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then +echo "FAILURE: A JM did not take over." +EXIT_CODE=1 +fi + +if [[ $EXIT_CODE != 0 ]]; then +echo "One or more tests FAILED." +exit $EXIT_CODE +fi +} + +function jm_watchdog() { +local EXPECTED_JMS=$1 +local IP_PORT=$2 + +while true; do +local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`; +local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS)) +for (( c=0; c
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205084180 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { +if [ ${CLEARED} -eq 0 ]; then + +if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then +echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}" +kill ${JM_WATCHDOG_PID} 2> /dev/null +wait ${JM_WATCHDOG_PID} 2> /dev/null +fi + +CLEARED=1 +fi +} + +function verify_logs() { +local OUTPUT=$FLINK_DIR/log/*.out +local JM_FAILURES=$1 +local EXIT_CODE=0 + +# verify that we have no alerts +if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then +echo "FAILURE: Alerts found at the general purpose DataSet job." +EXIT_CODE=1 +fi + +# checks that all apart from the first JM recover the failed jobgraph. +if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then +echo "FAILURE: A JM did not take over." +EXIT_CODE=1 +fi + +if [[ $EXIT_CODE != 0 ]]; then +echo "One or more tests FAILED." +exit $EXIT_CODE +fi +} + +function jm_watchdog() { +local EXPECTED_JMS=$1 +local IP_PORT=$2 + +while true; do +local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`; +local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS)) +for (( c=0; c
[GitHub] flink pull request #6418: [FLINK-9939][runtime] Mesos: Not setting TMP dirs ...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/6418 [FLINK-9939][runtime] Mesos: Not setting TMP dirs causes NPE ## What is the purpose of the change *This fixes a possible NPE when deploying on Mesos.* ## Brief change log - *Add null check to `BootstrapTools.updateTmpDirectoriesInConfiguration(...)`, and add `@Nullable` annotation.* ## Verifying this change This change added tests and can be verified as follows: - *Added unit test to assert that `updateTmpDirectoriesInConfiguration` can handle `null` values.* - *Manually deployed on Mesos.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-9939 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6418.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6418 commit eb03ada77db7633d330be7847dbdf9ee801a9bee Author: gyao Date: 2018-07-25T10:23:20Z [hotfix][tests] Fix checkstyle violations in BootstrapToolsTest. commit 261d4d7423b9ca179ac0004625f51b7b71655d63 Author: gyao Date: 2018-07-25T11:57:34Z [FLINK-9939][runtime] Add null check before setting tmp dirs in config. ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205082457 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { +if [ ${CLEARED} -eq 0 ]; then + +if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then +echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}" +kill ${JM_WATCHDOG_PID} 2> /dev/null +wait ${JM_WATCHDOG_PID} 2> /dev/null +fi + +CLEARED=1 +fi +} + +function verify_logs() { +local OUTPUT=$FLINK_DIR/log/*.out +local JM_FAILURES=$1 +local EXIT_CODE=0 + +# verify that we have no alerts +if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then +echo "FAILURE: Alerts found at the general purpose DataSet job." +EXIT_CODE=1 +fi + +# checks that all apart from the first JM recover the failed jobgraph. +if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then +echo "FAILURE: A JM did not take over." +EXIT_CODE=1 +fi + +if [[ $EXIT_CODE != 0 ]]; then +echo "One or more tests FAILED." +exit $EXIT_CODE +fi +} + +function jm_watchdog() { +local EXPECTED_JMS=$1 +local IP_PORT=$2 + +while true; do +local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`; +local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS)) +for (( c=0; c> ${TEST_DATA_DIR}/control/test.txt --- End diff -- You are right. Do you think we need the changes in the `Generator` source or shall I maybe just introduce a flag to make the source infinite? ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205081350 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { +if [ ${CLEARED} -eq 0 ]; then + +if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then +echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}" +kill ${JM_WATCHDOG_PID} 2> /dev/null +wait ${JM_WATCHDOG_PID} 2> /dev/null +fi + +CLEARED=1 +fi +} + +function verify_logs() { +local OUTPUT=$FLINK_DIR/log/*.out +local JM_FAILURES=$1 +local EXIT_CODE=0 + +# verify that we have no alerts +if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then +echo "FAILURE: Alerts found at the general purpose DataSet job." +EXIT_CODE=1 +fi + +# checks that all apart from the first JM recover the failed jobgraph. +if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then +echo "FAILURE: A JM did not take over." +EXIT_CODE=1 +fi + +if [[ $EXIT_CODE != 0 ]]; then +echo "One or more tests FAILED." +exit $EXIT_CODE +fi +} + +function jm_watchdog() { +local EXPECTED_JMS=$1 +local IP_PORT=$2 + +while true; do +local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`; +local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS)) +for (( c=0; c> ${TEST_DATA_DIR}/control/test.txt --- End diff -- But the loop executes `wait_job_running ${JOB_ID}`, so don't we know it _actually_ restarted? ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205080760 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { +if [ ${CLEARED} -eq 0 ]; then + +if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then +echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}" +kill ${JM_WATCHDOG_PID} 2> /dev/null +wait ${JM_WATCHDOG_PID} 2> /dev/null +fi + +CLEARED=1 +fi +} + +function verify_logs() { +local OUTPUT=$FLINK_DIR/log/*.out +local JM_FAILURES=$1 +local EXIT_CODE=0 + +# verify that we have no alerts +if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then +echo "FAILURE: Alerts found at the general purpose DataSet job." +EXIT_CODE=1 +fi + +# checks that all apart from the first JM recover the failed jobgraph. +if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then +echo "FAILURE: A JM did not take over." +EXIT_CODE=1 +fi + +if [[ $EXIT_CODE != 0 ]]; then +echo "One or more tests FAILED." +exit $EXIT_CODE +fi +} + +function jm_watchdog() { +local EXPECTED_JMS=$1 +local IP_PORT=$2 + +while true; do +local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`; +local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS)) +for (( c=0; c> ${TEST_DATA_DIR}/control/test.txt --- End diff -- We've verified that it started being recovered. We check for string "Recovered SubmittedJobGraph" It might happen that it won't be able to run, e.g. it might not be able to allocate resources. I think with cancel we would not cancel e.g. this one: https://issues.apache.org/jira/browse/FLINK-9635. Am I correct? ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205078058 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { +if [ ${CLEARED} -eq 0 ]; then + +if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then +echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}" +kill ${JM_WATCHDOG_PID} 2> /dev/null +wait ${JM_WATCHDOG_PID} 2> /dev/null +fi + +CLEARED=1 +fi +} + +function verify_logs() { +local OUTPUT=$FLINK_DIR/log/*.out +local JM_FAILURES=$1 +local EXIT_CODE=0 + +# verify that we have no alerts +if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then +echo "FAILURE: Alerts found at the general purpose DataSet job." +EXIT_CODE=1 +fi + +# checks that all apart from the first JM recover the failed jobgraph. +if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then +echo "FAILURE: A JM did not take over." +EXIT_CODE=1 +fi + +if [[ $EXIT_CODE != 0 ]]; then +echo "One or more tests FAILED." +exit $EXIT_CODE +fi +} + +function jm_watchdog() { +local EXPECTED_JMS=$1 +local IP_PORT=$2 + +while true; do +local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`; +local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS)) +for (( c=0; c> ${TEST_DATA_DIR}/control/test.txt --- End diff -- We've already verified that the job is properly restarted in the jobmanager-kill loop, so this should only be about shutting down the job. ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205076492 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { +if [ ${CLEARED} -eq 0 ]; then + +if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then +echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}" +kill ${JM_WATCHDOG_PID} 2> /dev/null +wait ${JM_WATCHDOG_PID} 2> /dev/null +fi + +CLEARED=1 +fi +} + +function verify_logs() { +local OUTPUT=$FLINK_DIR/log/*.out +local JM_FAILURES=$1 +local EXIT_CODE=0 + +# verify that we have no alerts +if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then +echo "FAILURE: Alerts found at the general purpose DataSet job." +EXIT_CODE=1 +fi + +# checks that all apart from the first JM recover the failed jobgraph. +if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then +echo "FAILURE: A JM did not take over." +EXIT_CODE=1 +fi + +if [[ $EXIT_CODE != 0 ]]; then +echo "One or more tests FAILED." +exit $EXIT_CODE +fi +} + +function jm_watchdog() { +local EXPECTED_JMS=$1 +local IP_PORT=$2 + +while true; do +local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`; +local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS)) +for (( c=0; c> ${TEST_DATA_DIR}/control/test.txt --- End diff -- Probably we could just for testing the HA. This way though we ensure it completes succesfully after all the restarts. ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205069073 --- Diff: flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java --- @@ -66,14 +59,21 @@ public static void main(String[] args) throws Exception { ParameterTool params = ParameterTool.fromArgs(args); int loadFactor = Integer.parseInt(params.getRequired("loadFactor")); String outputPath = params.getRequired("outputPath"); + String source = params.get("source", null); --- End diff -- source parameter is not documented in the javadocs ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205070681 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { --- End diff -- same as below ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205070467 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { +if [ ${CLEARED} -eq 0 ]; then + +if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then +echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}" +kill ${JM_WATCHDOG_PID} 2> /dev/null +wait ${JM_WATCHDOG_PID} 2> /dev/null +fi + +CLEARED=1 +fi +} + +function verify_logs() { +local OUTPUT=$FLINK_DIR/log/*.out +local JM_FAILURES=$1 +local EXIT_CODE=0 + +# verify that we have no alerts +if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then +echo "FAILURE: Alerts found at the general purpose DataSet job." +EXIT_CODE=1 +fi + +# checks that all apart from the first JM recover the failed jobgraph. +if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then +echo "FAILURE: A JM did not take over." +EXIT_CODE=1 +fi + +if [[ $EXIT_CODE != 0 ]]; then +echo "One or more tests FAILED." +exit $EXIT_CODE +fi +} + +function jm_watchdog() { +local EXPECTED_JMS=$1 +local IP_PORT=$2 + +while true; do +local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`; +local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS)) +for (( c=0; c
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205070430 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { +if [ ${CLEARED} -eq 0 ]; then + +if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then +echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}" +kill ${JM_WATCHDOG_PID} 2> /dev/null +wait ${JM_WATCHDOG_PID} 2> /dev/null +fi + +CLEARED=1 +fi +} + +function verify_logs() { +local OUTPUT=$FLINK_DIR/log/*.out +local JM_FAILURES=$1 +local EXIT_CODE=0 + +# verify that we have no alerts +if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then +echo "FAILURE: Alerts found at the general purpose DataSet job." +EXIT_CODE=1 +fi + +# checks that all apart from the first JM recover the failed jobgraph. +if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then +echo "FAILURE: A JM did not take over." +EXIT_CODE=1 +fi + +if [[ $EXIT_CODE != 0 ]]; then +echo "One or more tests FAILED." +exit $EXIT_CODE +fi +} + +function jm_watchdog() { --- End diff -- move into shared `common-ha.sh` ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205072007 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { +if [ ${CLEARED} -eq 0 ]; then + +if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then +echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}" +kill ${JM_WATCHDOG_PID} 2> /dev/null +wait ${JM_WATCHDOG_PID} 2> /dev/null +fi + +CLEARED=1 +fi +} + +function verify_logs() { +local OUTPUT=$FLINK_DIR/log/*.out +local JM_FAILURES=$1 +local EXIT_CODE=0 + +# verify that we have no alerts +if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then +echo "FAILURE: Alerts found at the general purpose DataSet job." +EXIT_CODE=1 +fi + +# checks that all apart from the first JM recover the failed jobgraph. +if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then +echo "FAILURE: A JM did not take over." +EXIT_CODE=1 +fi + +if [[ $EXIT_CODE != 0 ]]; then +echo "One or more tests FAILED." +exit $EXIT_CODE +fi +} + +function jm_watchdog() { +local EXPECTED_JMS=$1 +local IP_PORT=$2 + +while true; do +local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`; +local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS)) +for (( c=0; c> ${TEST_DATA_DIR}/control/test.txt --- End diff -- Couldn't we simply cancel the job? ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205070024 --- Diff: flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/Generator.java --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.tests; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.Preconditions; + +import java.io.File; +import java.io.IOException; + +/** + * InputFormat that generates a deterministic DataSet of Tuple2(String, Integer) + * + * String: key, can be repeated. + * Integer: uniformly distributed int between 0 and 127 + * + * + * If control path was provided, as long as this file is empty dummy elements with value equal to -1 will be emitted. + */ +public class Generator implements InputFormat, GenericInputSplit> { + + // total number of records + private final long numRecords; + // total number of keys + private final long numKeys; + + // records emitted per partition + private long recordsPerPartition; + // number of keys per partition + private long keysPerPartition; + + // number of currently emitted records + private long recordCnt; + + // id of current partition + private int partitionId; + + private final String path; + + private boolean hasStarted = true; + + public Generator(long numKeys, int recordsPerKey) { + this(numKeys, recordsPerKey, null); + } + + public Generator(long numKeys, int recordsPerKey, String controlPath) { + this.numKeys = numKeys; + this.numRecords = numKeys * recordsPerKey; + this.path = controlPath; + } + + @Override + public void configure(Configuration parameters) { } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { + return null; + } + + @Override + public GenericInputSplit[] createInputSplits(int minNumSplits) { + + GenericInputSplit[] splits = new GenericInputSplit[minNumSplits]; + for (int i = 0; i < minNumSplits; i++) { + splits[i] = new GenericInputSplit(i, minNumSplits); + } + return splits; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(GenericInputSplit[] inputSplits) { + return new DefaultInputSplitAssigner(inputSplits); + } + + @Override + public void open(GenericInputSplit split) throws IOException { + this.partitionId = split.getSplitNumber(); + // total number of partitions + int numPartitions = split.getTotalNumberOfSplits(); + + // ensure even distribution of records and keys + Preconditions.checkArgument( + numRecords % numPartitions == 0, + "Records cannot be evenly distributed among partitions"); + Preconditions.checkArgument( + numKeys % numPartitions == 0, + "Keys cannot be evenly distributed among partitions"); + + this.recordsPerPartition = numRecords / numPartitions; + this.keysPerPartition = numKeys / numPartitions; + + this.recordCnt = 0; + + if (this.path != null) { + this.hasStarted = false; + } + } + + @
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205071341 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { +if [ ${CLEARED} -eq 0 ]; then + +if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then +echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}" +kill ${JM_WATCHDOG_PID} 2> /dev/null +wait ${JM_WATCHDOG_PID} 2> /dev/null +fi + +CLEARED=1 +fi +} + +function verify_logs() { +local OUTPUT=$FLINK_DIR/log/*.out +local JM_FAILURES=$1 +local EXIT_CODE=0 + +# verify that we have no alerts +if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then +echo "FAILURE: Alerts found at the general purpose DataSet job." +EXIT_CODE=1 +fi + +# checks that all apart from the first JM recover the failed jobgraph. +if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then +echo "FAILURE: A JM did not take over." +EXIT_CODE=1 +fi + +if [[ $EXIT_CODE != 0 ]]; then +echo "One or more tests FAILED." +exit $EXIT_CODE +fi +} + +function jm_watchdog() { +local EXPECTED_JMS=$1 +local IP_PORT=$2 + +while true; do +local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`; +local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS)) +for (( c=0; c
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205070634 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { +if [ ${CLEARED} -eq 0 ]; then + +if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then +echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}" +kill ${JM_WATCHDOG_PID} 2> /dev/null +wait ${JM_WATCHDOG_PID} 2> /dev/null +fi + +CLEARED=1 +fi +} + +function verify_logs() { --- End diff -- move into shared `common-ha.sh`, parameterized the strings to search for so it's also applicable for the streaming test. ---
[GitHub] flink pull request #6417: [FLINK-9913][runtime] Improve output serialization...
GitHub user zhijiangW opened a pull request: https://github.com/apache/flink/pull/6417 [FLINK-9913][runtime] Improve output serialization only once in RecordWriter ## What is the purpose of the change *This pull request improves the output serialization only once for multi target channels in `RecordWriter`, rather than serialization as many times as the number of selected channels. ## Brief change log - *Only one `RecordSerializer` is created for all the output channels in `RecordWriter`* - *Restructure the processes of `emit`, `broadcastEmit`, randomEmit` in `RecordWriter`* - *Restructure the interface methods in `RecordSerializer`* ## Verifying this change This change is already covered by existing tests, such as *SpanningRecordSerializationTest*, etc. And adds new tests in `RecordWriterTest` to verify: - *The serialization results are correct by `RecordWriter#emit` with `BroadcastPartitioner`* - *The serialization results are correct by `RecordWriter#broadcastEmit` directly* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (yes) - The runtime per-record code paths (performance sensitive): (yes) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (no) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/zhijiangW/flink FLINK-9913 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6417.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6417 commit 109ddb37abafcea28478b90cda10b965e0c399d5 Author: Zhijiang Date: 2018-07-25T05:45:23Z [FLINK-9913][runtime] Improve output serialization only once in RecordWriter ---
[GitHub] flink pull request #6416: [FLINK-9942][rest] Guard handlers against null fie...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6416 [FLINK-9942][rest] Guard handlers against null fields ## What is the purpose of the change This PR fixes prevents some NPEs that could arise if fields in the request are set to null or are omitted. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 9942 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6416.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6416 commit 3e9046864d566ef6aa0d9af7a09771ebb5fe556b Author: zentol Date: 2018-07-25T06:39:29Z [FLINK-9942][rest] Guard handlers against null fields ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user cjolif commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r205051308 --- Diff: flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch6; + +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.util.Preconditions; + +import org.apache.http.HttpHost; +import org.elasticsearch.action.bulk.BackoffPolicy; +import org.elasticsearch.action.bulk.BulkItemResponse; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.client.RestClient; +import org.elasticsearch.client.RestHighLevelClient; +import org.elasticsearch.common.unit.TimeValue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +import java.util.List; +import java.util.Map; + +/** + * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 6 and later versions. + */ +public class Elasticsearch6ApiCallBridge implements ElasticsearchApiCallBridge { + + private static final long serialVersionUID = -5222683870097809633L; + + private static final Logger LOG = LoggerFactory.getLogger(Elasticsearch6ApiCallBridge.class); + + /** +* User-provided HTTP Host. +*/ + private final List httpHosts; + + Elasticsearch6ApiCallBridge(List httpHosts) { + Preconditions.checkArgument(httpHosts != null && !httpHosts.isEmpty()); + this.httpHosts = httpHosts; + } + + @Override + public RestHighLevelClient createClient(Map clientConfig) { + RestHighLevelClient rhlClient = --- End diff -- might have been good to support: - context path / path prefix in addition to host - login/password for Elasticsearch instances protected that's ok to not do it as soon as the user can by subclassing. Maybe to make it easier to subclass there should be two methods. keep the public `createClient` one that returns the `RHLClient`. And add a protected method `createRestClientBuilder` which return the `RestClientBuilder`. This way one can just redefine the protected method and let the pubic one handle the actual `RHLClient` instanciation from the `RestClientBuilder` created by the protected method. ---
[GitHub] flink pull request #6414: [hotfix] Enrich exception message
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6414 ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r205037835 --- Diff: flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties --- @@ -0,0 +1,27 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target=System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger --- End diff -- Removed. ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r205037795 --- Diff: flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch6.examples; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink; + +import org.apache.http.HttpHost; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that + * you have a cluster named "elasticsearch" running or change the name of cluster in the config map. + */ +public class ElasticsearchSinkExample { --- End diff -- Actually, we decided to move examples out of the test code. Removing this. ---
[GitHub] flink pull request #6388: [FLINK-6222] Allow passing env variables to start ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6388 ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r205031956 --- Diff: flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.InternalSettingsPreparer; +import org.elasticsearch.node.Node; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.transport.Netty4Plugin; + +import java.io.File; +import java.util.Collections; + +/** + * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for Elasticsearch 6. + * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for integration tests. + */ +public class EmbeddedElasticsearchNodeEnvironmentImpl implements EmbeddedElasticsearchNodeEnvironment { + + private Node node; + + @Override + public void start(File tmpDataFolder, String clusterName) throws Exception { + if (node == null) { + Settings settings = Settings.builder() + .put("cluster.name", clusterName) + .put("http.enabled", false) + .put("path.home", tmpDataFolder.getParent()) + .put("path.data", tmpDataFolder.getAbsolutePath()) + .put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME) --- End diff -- You're right, removed. ---
[GitHub] flink pull request #6372: [Flink-9353] Tests running per job standalone clus...
Github user dawidwys commented on a diff in the pull request: https://github.com/apache/flink/pull/6372#discussion_r205030130 --- Diff: flink-end-to-end-tests/README.md --- @@ -31,6 +31,12 @@ You can also run tests individually via $ FLINK_DIR= flink-end-to-end-tests/run-single-test.sh your_test.sh arg1 arg2 ``` +### Kubernetes test + +Kubernetes test (test_kubernetes_embedded_job.sh) assumes a running minikube cluster. Right now we cannot --- End diff -- Actually it assumes Minikube not Kubernetes. The problem is that we use one specific minikube function to populate the docker images, that we build to the minikube docker installation. To run it on any Kubernetes cluster we would need to upload the images to docker repository and point the cluster to use it. The specific minikube method is: `eval $(minikube docker-env)` ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r205029480 --- Diff: flink-end-to-end-tests/run-nightly-tests.sh --- @@ -96,6 +96,7 @@ run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR run_test "Elasticsearch (v1.7.1) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 1 https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz"; run_test "Elasticsearch (v2.3.5) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 2 https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz"; run_test "Elasticsearch (v5.1.2) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 5 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz"; +run_test "Elasticsearch (v6.3.1) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 6 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz"; --- End diff -- Added a loop to wait until the Elasticsearch node is really running. ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
GitHub user dawidwys opened a pull request: https://github.com/apache/flink/pull/6415 [FLINK-8974] Run all-round DataSet job with failures in HA mode Added all-round DataSet end-to-end test that runs in HA mode. It verifies the job is restarted correctly after job manager failure. You can merge this pull request into a Git repository by running: $ git pull https://github.com/dawidwys/flink FLINK-8974 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6415.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6415 commit 4a9a58505ed931c445450c210e2436c910f534b8 Author: Dawid Wysakowicz Date: 2018-07-25T08:45:09Z [FLINK-8974] Run all-round DataSet job with failures in HA mode ---
[GitHub] flink pull request #6392: [FLINK-9694][table] Fix NPE in CRowSerializerConfi...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6392 ---
[GitHub] flink pull request #6414: [hotfix] Enrich exception message
GitHub user TisonKun opened a pull request: https://github.com/apache/flink/pull/6414 [hotfix] Enrich exception message ## What is the purpose of the change There is a time I got fail on ExecutionGraphTestUtils#waitUntilJobStatus, it throws `TimeoutException` without detailed message. I'd like to throw with the status excepted and what it actually is. ## Brief change log Trivial work. You can merge this pull request into a Git repository by running: $ git pull https://github.com/TisonKun/flink patch-1 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6414.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6414 commit df17b34fd94cb89fa8073508ea920991c2f17134 Author: éæ¢ç« Date: 2018-07-25T08:11:06Z [hotfix] Enrich exception message ---
[GitHub] flink pull request #6372: [Flink 9353] Tests running per job standalone clus...
Github user aljoscha commented on a diff in the pull request: https://github.com/apache/flink/pull/6372#discussion_r205014402 --- Diff: flink-end-to-end-tests/README.md --- @@ -31,6 +31,12 @@ You can also run tests individually via $ FLINK_DIR= flink-end-to-end-tests/run-single-test.sh your_test.sh arg1 arg2 ``` +### Kubernetes test + +Kubernetes test (test_kubernetes_embedded_job.sh) assumes a running minikube cluster. Right now we cannot --- End diff -- This should be "assumes a running Kubernetes cluster". And maybe point towards minikube as one easy way for getting that locally. ---
[GitHub] flink pull request #6411: [FLINK-9941][ScalaAPI] Flush in ScalaCsvOutputForm...
GitHub user Lemonjing reopened a pull request: https://github.com/apache/flink/pull/6411 [FLINK-9941][ScalaAPI] Flush in ScalaCsvOutputFormat before closing, to ensure CI stability ## What is the purpose of the change This pull request update scala api `ScalaCsvOutputFormat` to increase CI stability. ## Brief change log Add flush method before close function in ScalaCsvOutputFormat for scala API. ## Verifying this change Add `ScalaCsvOutputFormatTest` and test passed. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation Does this pull request introduce a new feature? (yes / **no**) If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) You can merge this pull request into a Git repository by running: $ git pull https://github.com/Lemonjing/flink csv-close-hotfix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6411.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6411 commit 95a9b60b1ece7d248755d92868e682c4ee0fd334 Author: lemonjing <932191671@...> Date: 2018-07-25T07:06:10Z [FLINK-9941][ScalaAPI] flush in ScalaCsvOutputFormat before closing, to ensure CI stability ---
[GitHub] flink pull request #6413: [FLINK-8993] [tests] Let general purpose DataStrea...
GitHub user tzulitai opened a pull request: https://github.com/apache/flink/pull/6413 [FLINK-8993] [tests] Let general purpose DataStream job uses KryoSerializer via type extraction ## What is the purpose of the change The general purpose DataStream job previously only uses the `KryoSerializer` via a custom state serializer. This PR allows the job to also use the `KryoSerializer` via Flink's type extraction. ## Brief change log - Adapt the state builders to be able to supply a state class, instead of a state type serializer. - Let `DataStreamAllroundTestJob` specify state serializers via state classes instead of a direct custom serializer. ## Verifying this change This is a extension to existing end-to-end tests (`test-streaming-savepoint.sh`). ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/tzulitai/flink FLINK-8993 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6413.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6413 commit 428d5427227343479b6d63daf7fced8f1bf9a69c Author: Tzu-Li (Gordon) Tai Date: 2018-07-25T06:58:46Z [FLINK-8993] [tests] Let general purpose DataStream job uses KryoSerializer via type extraction ---
[GitHub] flink pull request #6411: [FLINK-9941][ScalaAPI] Flush in ScalaCsvOutputForm...
Github user Lemonjing closed the pull request at: https://github.com/apache/flink/pull/6411 ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r204756208 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java --- @@ -39,23 +38,25 @@ * exactly one instance of the call bridge, and state cleanup is performed when the sink is closed. */ @Internal -public interface ElasticsearchApiCallBridge extends Serializable { +public abstract class ElasticsearchApiCallBridge implements Serializable { /** -* Creates an Elasticsearch {@link Client}. +* Creates an Elasticsearch client implementing {@link AutoCloseable}. * * @param clientConfig The configuration to use when constructing the client. * @return The created client. */ - Client createClient(Map clientConfig); + public abstract AutoCloseable createClient(Map clientConfig); + + public abstract BulkProcessor.Builder createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener listener); --- End diff -- No docs here? ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r204999073 --- Diff: flink-end-to-end-tests/run-nightly-tests.sh --- @@ -96,6 +96,7 @@ run_test "Local recovery and sticky scheduling end-to-end test" "$END_TO_END_DIR run_test "Elasticsearch (v1.7.1) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 1 https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz"; run_test "Elasticsearch (v2.3.5) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 2 https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz"; run_test "Elasticsearch (v5.1.2) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 5 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz"; +run_test "Elasticsearch (v6.3.1) sink end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 6 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz"; --- End diff -- The test is not runnable on my machine. ``` Elasticsearch node is not running. grep: /Users/twalthr/flink/flink/build-target/log/*.out: No such file or directory [FAIL] './test-scripts/test_streaming_elasticsearch.sh 6 https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz' failed after 0 minutes and 18 seconds! Test exited with exit code 1 ``` The tests exits before elastic search has actually started. Also killing does not work. An Elasticsearch process is still running afterwards. ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r204991878 --- Diff: flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java --- @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch; + +import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase; + +import org.elasticsearch.client.Client; +import org.elasticsearch.common.network.NetworkModule; +import org.elasticsearch.common.settings.Settings; +import org.elasticsearch.node.InternalSettingsPreparer; +import org.elasticsearch.node.Node; +import org.elasticsearch.plugins.Plugin; +import org.elasticsearch.transport.Netty4Plugin; + +import java.io.File; +import java.util.Collections; + +/** + * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for Elasticsearch 6. + * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for integration tests. + */ +public class EmbeddedElasticsearchNodeEnvironmentImpl implements EmbeddedElasticsearchNodeEnvironment { + + private Node node; + + @Override + public void start(File tmpDataFolder, String clusterName) throws Exception { + if (node == null) { + Settings settings = Settings.builder() + .put("cluster.name", clusterName) + .put("http.enabled", false) + .put("path.home", tmpDataFolder.getParent()) + .put("path.data", tmpDataFolder.getAbsolutePath()) + .put(NetworkModule.HTTP_TYPE_KEY, Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME) --- End diff -- Are these values still valid? I thought we are not relying on Netty anymore with the rest client? ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r204758828 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java --- @@ -176,7 +175,7 @@ public void setDelayMillis(long delayMillis) { private AtomicLong numPendingRequests = new AtomicLong(0); /** Elasticsearch client created using the call bridge. */ - private transient Client client; + private transient AutoCloseable client; --- End diff -- Same here. Why not parameterize the class and be type save? ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r204758434 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java --- @@ -64,13 +65,15 @@ * @param builder the {@link BulkProcessor.Builder} to configure. * @param flushBackoffPolicy user-provided backoff retry settings ({@code null} if the user disabled backoff retries). */ - void configureBulkProcessorBackoff( + public abstract void configureBulkProcessorBackoff( BulkProcessor.Builder builder, @Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy flushBackoffPolicy); /** * Perform any necessary state cleanup. */ - void cleanup(); + public void cleanup() { --- End diff -- Use Java 8 defaults and let this class stay an interface? ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r204993234 --- Diff: flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch6; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase; +import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit; + +import org.apache.http.HttpHost; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.RestHighLevelClient; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * IT cases for the {@link ElasticsearchSink}. + */ +public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase { + + /** +* Tests that the Elasticsearch sink works properly using a {@link RestHighLevelClient}. +*/ + public void runTransportClientTest() throws Exception { + final String index = "transport-client-test-index"; + + final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + + DataStreamSource> source = env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction()); + + Map userConfig = new HashMap<>(); + // This instructs the sink to emit after every element, otherwise they would be buffered + userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + + source.addSink(createElasticsearchSinkForEmbeddedNode(userConfig, + new SourceSinkDataTestKit.TestElasticsearchSinkFunction(index))); + + env.execute("Elasticsearch RestHighLevelClient Test"); + + // verify the results + Client client = embeddedNodeEnv.getClient(); + SourceSinkDataTestKit.verifyProducedSinkData(client, index); + + client.close(); + } + + /** +* Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is {@code null}. +*/ + public void runNullTransportClientTest() throws Exception { + try { + Map userConfig = new HashMap<>(); + userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + createElasticsearchSink6(userConfig, null, new SourceSinkDataTestKit.TestElasticsearchSinkFunction("test")); + } catch (IllegalArgumentException expectedException) { + // test passes + return; + } + + fail(); + } + + /** +* Tests that the Elasticsearch sink fails eagerly if the provided list of transport addresses is empty. +*/ + public void runEmptyTransportClientTest() throws Exception { + try { + Map userConfig = new HashMap<>(); + userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1"); + createElasticsearchSink6(userConfig, + Collect
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r204994308 --- Diff: flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties --- @@ -0,0 +1,27 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +log4j.rootLogger=INFO, testlogger + +log4j.appender.testlogger=org.apache.log4j.ConsoleAppender +log4j.appender.testlogger.target=System.err +log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout +log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n + +# suppress the irrelevant (wrong) warnings from the netty channel handler +log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, testlogger --- End diff -- Still necessary? ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r204990734 --- Diff: flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch6; + +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler; + +import org.apache.http.HttpHost; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.client.RestHighLevelClient; + +import java.util.List; +import java.util.Map; + +/** + * Elasticsearch 6.x sink that requests multiple {@link ActionRequest ActionRequests} + * against a cluster for each incoming element. + * + * The sink internally uses a {@link RestHighLevelClient} to communicate with an Elasticsearch cluster. + * The sink will fail if no cluster can be connected to using the provided transport addresses passed to the constructor. + * + * The {@link Map} passed to the constructor is used to create the {@code TransportClient}. The config keys can be found + * in the https://www.elastic.io";>Elasticsearch documentation. An important setting is {@code cluster.name}, + * which should be set to the name of the cluster that the sink should emit to. + * + * Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}. + * This will buffer elements before sending a request to the cluster. The behaviour of the + * {@code BulkProcessor} can be configured using these config keys: + * + *{@code bulk.flush.max.actions}: Maximum amount of elements to buffer + *{@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer + *{@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two + * settings in milliseconds + * + * + * You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple + * {@link ActionRequest ActionRequests} for each incoming element. See the class level documentation of + * {@link ElasticsearchSinkFunction} for an example. + * + * @param Type of the elements handled by this sink + */ +public class ElasticsearchSink extends ElasticsearchSinkBase { --- End diff -- Add `@PublicEvolving` ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r204775927 --- Diff: flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch6; + +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler; + +import org.apache.http.HttpHost; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.client.RestHighLevelClient; + +import java.util.List; +import java.util.Map; + +/** + * Elasticsearch 6.x sink that requests multiple {@link ActionRequest ActionRequests} + * against a cluster for each incoming element. + * + * The sink internally uses a {@link RestHighLevelClient} to communicate with an Elasticsearch cluster. + * The sink will fail if no cluster can be connected to using the provided transport addresses passed to the constructor. + * + * The {@link Map} passed to the constructor is used to create the {@code TransportClient}. The config keys can be found --- End diff -- Update docs here. ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r204992515 --- Diff: flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java --- @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch6; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.streaming.api.datastream.DataStreamSource; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase; +import org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit; + +import org.apache.http.HttpHost; +import org.elasticsearch.client.Client; +import org.elasticsearch.client.RestHighLevelClient; + +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +/** + * IT cases for the {@link ElasticsearchSink}. + */ +public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase { + + /** +* Tests that the Elasticsearch sink works properly using a {@link RestHighLevelClient}. +*/ + public void runTransportClientTest() throws Exception { --- End diff -- There are no `@Test` annotations to run tests. We should also rename the method in `testXXX` as we usually do it. The super class method names should be updated as `runTransportClientTest` is not correct anymore. ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r204991006 --- Diff: flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch6; + +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler; + +import org.apache.http.HttpHost; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; +import org.elasticsearch.action.bulk.BulkProcessor; +import org.elasticsearch.client.RestHighLevelClient; + +import java.util.List; +import java.util.Map; + +/** + * Elasticsearch 6.x sink that requests multiple {@link ActionRequest ActionRequests} + * against a cluster for each incoming element. + * + * The sink internally uses a {@link RestHighLevelClient} to communicate with an Elasticsearch cluster. + * The sink will fail if no cluster can be connected to using the provided transport addresses passed to the constructor. + * + * The {@link Map} passed to the constructor is used to create the {@code TransportClient}. The config keys can be found + * in the https://www.elastic.io";>Elasticsearch documentation. An important setting is {@code cluster.name}, + * which should be set to the name of the cluster that the sink should emit to. + * + * Internally, the sink will use a {@link BulkProcessor} to send {@link ActionRequest ActionRequests}. + * This will buffer elements before sending a request to the cluster. The behaviour of the + * {@code BulkProcessor} can be configured using these config keys: + * + *{@code bulk.flush.max.actions}: Maximum amount of elements to buffer + *{@code bulk.flush.max.size.mb}: Maximum amount of data (in megabytes) to buffer + *{@code bulk.flush.interval.ms}: Interval at which to flush data regardless of the other two + * settings in milliseconds + * + * + * You also have to provide an {@link ElasticsearchSinkFunction}. This is used to create multiple + * {@link ActionRequest ActionRequests} for each incoming element. See the class level documentation of + * {@link ElasticsearchSinkFunction} for an example. + * + * @param Type of the elements handled by this sink + */ +public class ElasticsearchSink extends ElasticsearchSinkBase { + + private static final long serialVersionUID = 1L; + + /** +* Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link RestHighLevelClient}. +* +* @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element +* @param httpHosts The list of {@HttpHost} to which the {@link RestHighLevelClient} connects to. +*/ + public ElasticsearchSink(Map userConfig, List httpHosts, ElasticsearchSinkFunction elasticsearchSinkFunction) { + + this(userConfig, httpHosts, elasticsearchSinkFunction, new NoOpFailureHandler()); + } + + /** +* Creates a new {@code ElasticsearchSink} that connects to the cluster using a {@link RestHighLevelClient}. +* +* @param elasticsearchSinkFunction This is used to generate multiple {@link ActionRequest} from the incoming element +* @param failureHandler This is used to handle failed {@link ActionRequest} +* @param httpHosts The list of {@HttpHost} to which the {@link RestHighLevelClient} connects to. --- End diff -- Fix two invalid Javadocs. ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r204993809 --- Diff: flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java --- @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch6.examples; + +import org.apache.flink.api.common.functions.MapFunction; +import org.apache.flink.api.common.functions.RuntimeContext; +import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer; +import org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink; + +import org.apache.http.HttpHost; +import org.elasticsearch.action.index.IndexRequest; +import org.elasticsearch.client.Requests; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This example shows how to use the Elasticsearch Sink. Before running it you must ensure that + * you have a cluster named "elasticsearch" running or change the name of cluster in the config map. + */ +public class ElasticsearchSinkExample { --- End diff -- We should add tests for our examples. ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r204991076 --- Diff: flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java --- @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.connectors.elasticsearch6; + +import org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase; +import org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction; +import org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler; + +import org.apache.http.HttpHost; +import org.elasticsearch.action.ActionRequest; +import org.elasticsearch.action.DocWriteRequest; --- End diff -- Remove unused import. ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r204757871 --- Diff: flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java --- @@ -39,23 +38,25 @@ * exactly one instance of the call bridge, and state cleanup is performed when the sink is closed. */ @Internal -public interface ElasticsearchApiCallBridge extends Serializable { +public abstract class ElasticsearchApiCallBridge implements Serializable { --- End diff -- Parameterize the class instead of using `AutoClosable` as a synonym for the a client that implements this interface. This avoids manual casting in subclasses. ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r204750192 --- Diff: docs/dev/connectors/elasticsearch.md --- @@ -138,6 +143,31 @@ input.addSink(new ElasticsearchSink<>(config, transportAddresses, new Elasticsea } }));{% endhighlight %} + +{% highlight java %} +DataStream input = ...; + +List httpHost = new ArrayList<>(); +httpHosts.add(new HttpHost("127.0.0.1", 9200, "http")); +httpHosts.add(new HttpHost("10.2.3.1", 9200, "http")); + +input.addSink(new ElasticsearchSink<>(httpHosts, new ElasticsearchSinkFunction() { --- End diff -- Add an example for the user config as well to be in sync with the examples of other versions? Because the following paragraph mentions: > Especially important is the `cluster.name` parameter Btw could you also add imports to your examples. I just started to do this with my code examples to make it easier for people to find the used classes (see [here](https://ci.apache.org/projects/flink/flink-docs-master/dev/java_lambdas.html)) ---
[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...
Github user twalthr commented on a diff in the pull request: https://github.com/apache/flink/pull/6391#discussion_r204752713 --- Diff: docs/dev/connectors/elasticsearch.md --- @@ -190,9 +220,30 @@ input.addSink(new ElasticsearchSink(config, transportAddresses, new Elasticsearc })) {% endhighlight %} + +{% highlight scala %} +val input: DataStream[String] = ... + +val httpHosts = new java.util.ArrayList[HttpHost] +httpHosts.add(new HttpHost("127.0.0.1", 9300, "http")) +httpHosts.add(new HttpHost("10.2.3.1", 9300, "http")) + +input.addSink(new ElasticsearchSink(httpHosts, new ElasticsearchSinkFunction[String] { + def createIndexRequest(element: String): IndexRequest = { +val json = new java.util.HashMap[String, String] +json.put("data", element) + +return Requests.indexRequest() +.index("my-index") +.type("my-type") +.source(json) + } +})) +{% endhighlight %} + -Note how a `Map` of `String`s is used to configure the `ElasticsearchSink`. +Note how `TransportClient` based version use a `Map` of `String`s is used to configure the `ElasticsearchSink`. --- End diff -- Remove "is used to"? ---
[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6408#discussion_r204999405 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -233,26 +225,69 @@ public void run() { subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); long recordBatchSizeBytes = 0L; - long averageRecordSizeBytes = 0L; - for (UserRecord record : fetchedRecords) { recordBatchSizeBytes += record.getData().remaining(); deserializeRecordForCollectionAndUpdateState(record); } - if (useAdaptiveReads && !fetchedRecords.isEmpty()) { - averageRecordSizeBytes = recordBatchSizeBytes / fetchedRecords.size(); - maxNumberOfRecordsPerFetch = getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes); - } - nextShardItr = getRecordsResult.getNextShardIterator(); + + long processingEndTimeNanos = System.nanoTime(); --- End diff -- having a local variable here seems a bit redundant, since we always adjust it right afterwards. ---
[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6408#discussion_r204996330 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -233,26 +225,69 @@ public void run() { subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); long recordBatchSizeBytes = 0L; - long averageRecordSizeBytes = 0L; - for (UserRecord record : fetchedRecords) { recordBatchSizeBytes += record.getData().remaining(); deserializeRecordForCollectionAndUpdateState(record); } - if (useAdaptiveReads && !fetchedRecords.isEmpty()) { - averageRecordSizeBytes = recordBatchSizeBytes / fetchedRecords.size(); - maxNumberOfRecordsPerFetch = getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes); - } - nextShardItr = getRecordsResult.getNextShardIterator(); + + long processingEndTimeNanos = System.nanoTime(); + + long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos); --- End diff -- just a minor nit pick here: `adjustmentEndTimeNanos` would be better named as `adjustedEndTimeNanos` ---
[GitHub] flink pull request #6231: [FLINK-9694] Potentially NPE in CompositeTypeSeria...
Github user yanghua closed the pull request at: https://github.com/apache/flink/pull/6231 ---
[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...
Github user tzulitai commented on a diff in the pull request: https://github.com/apache/flink/pull/6408#discussion_r204998677 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -233,26 +225,69 @@ public void run() { subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); long recordBatchSizeBytes = 0L; - long averageRecordSizeBytes = 0L; - for (UserRecord record : fetchedRecords) { recordBatchSizeBytes += record.getData().remaining(); deserializeRecordForCollectionAndUpdateState(record); } - if (useAdaptiveReads && !fetchedRecords.isEmpty()) { - averageRecordSizeBytes = recordBatchSizeBytes / fetchedRecords.size(); - maxNumberOfRecordsPerFetch = getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes); - } - nextShardItr = getRecordsResult.getNextShardIterator(); + + long processingEndTimeNanos = System.nanoTime(); + + long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos); + long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos; + maxNumberOfRecordsPerFetch = adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes); + processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop } } } catch (Throwable t) { fetcherRef.stopWithError(t); } } + /** +* Adjusts loop timing to match target frequency if specified. +* @param processingStartTimeNanos The start time of the run loop "work" +* @param processingEndTimeNanos The end time of the run loop "work" +* @return The System.nanoTime() after the sleep (if any) +* @throws InterruptedException +*/ + protected long adjustRunLoopFrequency(long processingStartTimeNanos, long processingEndTimeNanos) + throws InterruptedException { + long endTimeNanos = processingEndTimeNanos; + if (fetchIntervalMillis != 0) { + long processingTimeNanos = processingEndTimeNanos - processingStartTimeNanos; + long sleepTimeMillis = fetchIntervalMillis - (processingTimeNanos / 1_000_000); + if (sleepTimeMillis > 0) { + Thread.sleep(sleepTimeMillis); + endTimeNanos = System.nanoTime(); + } + } + return endTimeNanos; + } + + /** +* Calculates how many records to read each time through the loop based on a target throughput +* and the measured frequenecy of the loop. +* @param runLoopTimeNanos The total time of one pass through the loop +* @param numRecords The number of records of the last read operation +* @param recordBatchSizeBytes The total batch size of the last read operation +*/ + protected int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, long recordBatchSizeBytes) { --- End diff -- If this is a method without side effects on the fields of the `ShardConsumer`, it might be better off to make this method static, and pass in the current `maxNumberOfRecordsPerFetch` as an argument. This makes it more clear that it is only an utility method to calculate the number of records to read. ---
[GitHub] flink pull request #6412: [FLINK-9941] Flush in ScalaCsvOutputFormat before ...
Github user buptljy closed the pull request at: https://github.com/apache/flink/pull/6412 ---
[GitHub] flink pull request #6412: [FLINK-9941] Flush in ScalaCsvOutputFormat before ...
GitHub user buptljy opened a pull request: https://github.com/apache/flink/pull/6412 [FLINK-9941] Flush in ScalaCsvOutputFormat before close ## What is the purpose of the change - Flush in ScalaCsvOutputFormat before close.We've already finished it in org.apache.flink.api.java.io.CsvOutputFormat. ## Brief change log - add flush in ScalaCsvOutputFormat before close. ## Verifying this change - unit tests. ## Does this pull request potentially affect one of the following parts: - no You can merge this pull request into a Git repository by running: $ git pull https://github.com/buptljy/flink FLINK-9941 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6412.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6412 commit 636456d2398bef69a805b96dfb0945459cfcfada Author: wind Date: 2018-07-25T06:01:36Z flush ScalaCsvOutputFormat before close ---
[GitHub] flink pull request #6403: [FLINK-9934] [table] Fix invalid field mapping by ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6403 ---
[GitHub] flink pull request #3124: [FLINK-5281] Extend KafkaJsonTableSources to suppo...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/3124 ---
[GitHub] flink pull request #6411: [FLINK-9941][ScalaAPI] Flush in ScalaCsvOutputForm...
GitHub user Lemonjing opened a pull request: https://github.com/apache/flink/pull/6411 [FLINK-9941][ScalaAPI] Flush in ScalaCsvOutputFormat before closing, to ensure CI stability ## What is the purpose of the change This pull request update scala api `ScalaCsvOutputFormat` to increase CI stability. ## Brief change log Add flush method before close function in ScalaCsvOutputFormat for scala API. ## Verifying this change This change is already covered by existing tests, such as ScalarFunctionsTest ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with @Public(Evolving): (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation Does this pull request introduce a new feature? (yes / **no**) If yes, how is the feature documented? (not applicable / docs / JavaDocs / **not documented**) You can merge this pull request into a Git repository by running: $ git pull https://github.com/Lemonjing/flink csv-close-hotfix Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6411.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6411 commit 6c6120722eef81c2c275b92a13a5687fef35e7bb Author: lemonjing <932191671@...> Date: 2018-07-25T05:46:55Z [hotfix] Flush in ScalaCsvOutputFormat before closing, to ensure CI stability ---
[GitHub] flink pull request #6410: Release 1.6
Github user uang520 closed the pull request at: https://github.com/apache/flink/pull/6410 ---
[GitHub] flink pull request #6410: Release 1.6
GitHub user uang520 opened a pull request: https://github.com/apache/flink/pull/6410 Release 1.6 *Thank you very much for contributing to Apache Flink - we are happy that you want to help us improve Flink. To help the community review your contribution in the best possible way, please go through the checklist below, which will get the contribution into a shape in which it can be best reviewed.* *Please understand that we do not do this to make contributions to Flink a hassle. In order to uphold a high standard of quality for code contributions, while at the same time managing a large number of contributions, we need contributors to prepare the contributions well, and give reviewers enough contextual information for the review. Please also understand that contributions that do not follow this guide will take longer to review and thus typically be picked up with lower priority by the community.* ## Contribution Checklist - Make sure that the pull request corresponds to a [JIRA issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are made for typos in JavaDoc or documentation files, which need no JIRA issue. - Name the pull request in the form "[FLINK-] [component] Title of the pull request", where *FLINK-* should be replaced by the actual issue number. Skip *component* if you are unsure about which is the best component. Typo fixes that have no associated JIRA issue should be named following this pattern: `[hotfix] [docs] Fix typo in event time introduction` or `[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`. - Fill out the template below to describe the changes contributed by the pull request. That will give reviewers the context they need to do the review. - Make sure that the change passes the automated tests, i.e., `mvn clean verify` passes. You can set up Travis CI to do that following [this guide](http://flink.apache.org/contribute-code.html#best-practices). - Each pull request should address only one issue, not mix up code from multiple issues. - Each commit in the pull request has a meaningful commit message (including the JIRA id) - Once all items of the checklist are addressed, remove the above text and this checklist, leaving only the filled out template below. **(The sections below can be removed for hotfixes of typos)** ## What is the purpose of the change *(For example: This pull request makes task deployment go through the blob server, rather than through RPC. That way we avoid re-transferring them on each deployment (during recovery).)* ## Brief change log *(for example:)* - *The TaskInfo is stored in the blob store on job creation time as a persistent artifact* - *Deployments RPC transmits only the blob storage reference* - *TaskManagers retrieve the TaskInfo from the blob cache* ## Verifying this change *(Please pick either of the following options)* This change is a trivial rework / code cleanup without any test coverage. *(or)* This change is already covered by existing tests, such as *(please describe tests)*. *(or)* This change added tests and can be verified as follows: *(example:)* - *Added integration tests for end-to-end deployment with large payloads (100MB)* - *Extended integration test for recovery after master (JobManager) failure* - *Added test that validates that TaskInfo is transferred only once across recoveries* - *Manually verified the change by running a 4 node cluser with 2 JobManagers and 4 TaskManagers, a stateful streaming program, and killing one JobManager and two TaskManagers during the execution, verifying that recovery happens correctly.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / no) - The serializers: (yes / no / don't know) - The runtime per-record code paths (performance sensitive): (yes / no / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know) - The S3 file system connector: (yes / no / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / no) - If yes, how is the feature documented? (not applicable / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/apache/flink release-1.6 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6410.patch To close this p
[GitHub] flink pull request #6401: [hotfix]fix typo for variable name dynamicProperti...
Github user rileyli closed the pull request at: https://github.com/apache/flink/pull/6401 ---
[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...
Github user glaksh100 commented on a diff in the pull request: https://github.com/apache/flink/pull/6408#discussion_r204942574 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -233,26 +225,68 @@ public void run() { subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); long recordBatchSizeBytes = 0L; - long averageRecordSizeBytes = 0L; - for (UserRecord record : fetchedRecords) { recordBatchSizeBytes += record.getData().remaining(); deserializeRecordForCollectionAndUpdateState(record); } - if (useAdaptiveReads && !fetchedRecords.isEmpty()) { - averageRecordSizeBytes = recordBatchSizeBytes / fetchedRecords.size(); - maxNumberOfRecordsPerFetch = getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes); - } - nextShardItr = getRecordsResult.getNextShardIterator(); + + long processingEndTimeNanos = System.nanoTime(); + + long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos); + long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos; + adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes); + processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop } } } catch (Throwable t) { fetcherRef.stopWithError(t); } } + /** +* Adjusts loop timing to match target frequency if specified. +* @param processingStartTimeNanos The start time of the run loop "work" +* @param processingEndTimeNanos The end time of the run loop "work" +* @return The System.nanoTime() after the sleep (if any) +* @throws InterruptedException +*/ + protected long adjustRunLoopFrequency(long processingStartTimeNanos, long processingEndTimeNanos) + throws InterruptedException { + long endTimeNanos = processingEndTimeNanos; + if (fetchIntervalMillis != 0) { + long processingTimeNanos = processingEndTimeNanos - processingStartTimeNanos; + long sleepTimeMillis = fetchIntervalMillis - (processingTimeNanos / 1_000_000); + if (sleepTimeMillis > 0) { + Thread.sleep(sleepTimeMillis); + endTimeNanos = System.nanoTime(); + } + } + return endTimeNanos; + } + + /** +* Calculates how many records to read each time through the loop based on a target throughput +* and the measured frequenecy of the loop. +* @param runLoopTimeNanos The total time of one pass through the loop +* @param numRecords The number of records of the last read operation +* @param recordBatchSizeBytes The total batch size of the last read operation +*/ + protected int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, long recordBatchSizeBytes) { + if (useAdaptiveReads && numRecords != 0 && runLoopTimeNanos != 0) { + long averageRecordSizeBytes = recordBatchSizeBytes / numRecords; + // Adjust number of records to fetch from the shard depending on current average record size + // to optimize 2 Mb / sec read limits + double loopFrequencyHz = 10.0d / runLoopTimeNanos; + double bytesPerRead = KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz; + maxNumberOfRecordsPerFetch = (int) (bytesPerRead / averageRecordSizeBytes); + // Ensure the value is not more than 1L + maxNumberOfRecordsPerFetch = Math.min(maxNumberOfRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX); + } + return maxNumberOfRecordsPerFetch; --- End diff -- Oops, thanks for catching. Updated to use the return value and also to use a local variable in the method to avoid re-assigning the class variable `maxNumberOfRecordsP
[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...
Github user tweise commented on a diff in the pull request: https://github.com/apache/flink/pull/6408#discussion_r204922265 --- Diff: flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java --- @@ -233,26 +225,68 @@ public void run() { subscribedShard.getShard().getHashKeyRange().getEndingHashKey()); long recordBatchSizeBytes = 0L; - long averageRecordSizeBytes = 0L; - for (UserRecord record : fetchedRecords) { recordBatchSizeBytes += record.getData().remaining(); deserializeRecordForCollectionAndUpdateState(record); } - if (useAdaptiveReads && !fetchedRecords.isEmpty()) { - averageRecordSizeBytes = recordBatchSizeBytes / fetchedRecords.size(); - maxNumberOfRecordsPerFetch = getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes); - } - nextShardItr = getRecordsResult.getNextShardIterator(); + + long processingEndTimeNanos = System.nanoTime(); + + long adjustmentEndTimeNanos = adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos); + long runLoopTimeNanos = adjustmentEndTimeNanos - processingStartTimeNanos; + adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), recordBatchSizeBytes); + processingStartTimeNanos = adjustmentEndTimeNanos; // for next time through the loop } } } catch (Throwable t) { fetcherRef.stopWithError(t); } } + /** +* Adjusts loop timing to match target frequency if specified. +* @param processingStartTimeNanos The start time of the run loop "work" +* @param processingEndTimeNanos The end time of the run loop "work" +* @return The System.nanoTime() after the sleep (if any) +* @throws InterruptedException +*/ + protected long adjustRunLoopFrequency(long processingStartTimeNanos, long processingEndTimeNanos) + throws InterruptedException { + long endTimeNanos = processingEndTimeNanos; + if (fetchIntervalMillis != 0) { + long processingTimeNanos = processingEndTimeNanos - processingStartTimeNanos; + long sleepTimeMillis = fetchIntervalMillis - (processingTimeNanos / 1_000_000); + if (sleepTimeMillis > 0) { + Thread.sleep(sleepTimeMillis); + endTimeNanos = System.nanoTime(); + } + } + return endTimeNanos; + } + + /** +* Calculates how many records to read each time through the loop based on a target throughput +* and the measured frequenecy of the loop. +* @param runLoopTimeNanos The total time of one pass through the loop +* @param numRecords The number of records of the last read operation +* @param recordBatchSizeBytes The total batch size of the last read operation +*/ + protected int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, long recordBatchSizeBytes) { + if (useAdaptiveReads && numRecords != 0 && runLoopTimeNanos != 0) { + long averageRecordSizeBytes = recordBatchSizeBytes / numRecords; + // Adjust number of records to fetch from the shard depending on current average record size + // to optimize 2 Mb / sec read limits + double loopFrequencyHz = 10.0d / runLoopTimeNanos; + double bytesPerRead = KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz; + maxNumberOfRecordsPerFetch = (int) (bytesPerRead / averageRecordSizeBytes); + // Ensure the value is not more than 1L + maxNumberOfRecordsPerFetch = Math.min(maxNumberOfRecordsPerFetch, ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX); + } + return maxNumberOfRecordsPerFetch; --- End diff -- the return value is never used ---
[GitHub] flink pull request #6409: Flink 9899.kinesis connector metrics
GitHub user glaksh100 opened a pull request: https://github.com/apache/flink/pull/6409 Flink 9899.kinesis connector metrics ## What is the purpose of the change The purpose of this change is to add metrics to the `ShardConsumer` to get more observability into the performance of the Kinesis connector, including the enhancements introduced in [FLINK-9897](https://issues.apache.org/jira/browse/FLINK-9899) . **Important** - https://github.com/apache/flink/pull/6408 has to be merged **before** taking out this change. ## Brief change log All metrics are added as gauges. The following per-shard metrics are added. : - sleepTimeMillis - maxNumberOfRecordsPerFetch - numberOfAggregatedRecordsPerFetch - numberOfDeaggregatedRecordsPerFetch - bytesRequestedPerFetch - averageRecordSizeBytes - runLoopTimeNanos - loopFrequencyHz ## Verifying this change This change is already covered by existing tests, such as: `ShardConsumerTest`, `KinesisDataFetcherTest`. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/lyft/flink FLINK-9899.KinesisConnectorMetrics Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6409.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6409 commit f333781a7c4f1a10b6120a962ff211e023bafaab Author: Lakshmi Gururaja Rao Date: 2018-07-24T18:44:08Z [FLINK-9897] Make adaptive reads depend on run loop time instead of fetch interval millis Remove unused method commit f51703177df9afcdba3778909b1e9d8b7fa4bf46 Author: Lakshmi Gururaja Rao Date: 2018-07-24T18:44:08Z [FLINK-9897] Make adaptive reads depend on run loop time instead of fetch interval millis commit d493097d09c6223383282ed90648853715b197ce Author: Lakshmi Gururaja Rao Date: 2018-07-24T21:13:53Z [FLINK-9899] Add more ShardConsumer metrics Checkstyle fix ---
[GitHub] flink pull request #6408: [FLINK-9897] Make adaptive reads depend on run loo...
GitHub user glaksh100 opened a pull request: https://github.com/apache/flink/pull/6408 [FLINK-9897] Make adaptive reads depend on run loop time instead of fetchintervalmillis ## What is the purpose of the change [FLINK-9692](https://github.com/apache/flink/pull/6300) introduced the feature of adapting `maxNumberOfRecordsPerFetch` based on the average size of Kinesis records. The PR assumed a maximum of `1/fetchIntervalMillis` reads/second. However, in the case that the run loop of the `ShardConsumer` takes more than `fetchIntervalMillis` to process records, the `maxNumberOfRecordsPerFetch` is still sub-optimal. The purpose of this change is to make the adaptive reads more efficient by using the actual run loop frequency to determine the number of reads/second and `maxNumberOfRecordsPerFetch`. The change also re-factors the run loop to be more modular. ## Brief change log - `processingStartTimeNanos` records start time of loop - `processingEndTimeNanos` records end time of loop - `adjustRunLoopFrequency()` adjusts end time depending on `sleepTimeMillis` (if any). - `runLoopTimeNanos` records actual run loop time. - `adaptRecordsToRead` calculates `maxNumberOfRecordsPerFetch` based on `runLoopTimeNanos` - Unused method `getAdaptiveMaxRecordsPerFetch` is removed. ## Verifying this change This change is already covered by existing tests, such as `ShardConsumerTest` This has also been tested against a stream with the following configuration ``` Number of Shards: 512 Parallelism: 128 ``` ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (yes / **no**) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) You can merge this pull request into a Git repository by running: $ git pull https://github.com/lyft/flink FLINK-9897.AdaptiveReadsRunLoop Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6408.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6408 commit 786556b9a9a509051a14772fbbd282db73e65252 Author: Lakshmi Gururaja Rao Date: 2018-07-24T18:44:08Z [FLINK-9897] Make adaptive reads depend on run loop time instead of fetch interval millis ---
[GitHub] flink pull request #6407: [FLINK-8478][docs] Add documentation page for diff...
GitHub user florianschmidt1994 opened a pull request: https://github.com/apache/flink/pull/6407 [FLINK-8478][docs] Add documentation page for different datastream joins ## What is the purpose of the change Add a documentation page under Application Development / Streaming / Joining that describes 1. The different types of window joins in the DataStream API 2. The newly introduced interval join in the DataStream API ## Brief change log - Added a new docs page - Added images to describe common scenarios ## Verifying this change Built the documentation with `./build_docs.sh` and it looks like expected ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): no - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no - The S3 file system connector: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable For those that just want to have a "quick look" I attached a screenshot ![](https://i.imgur.com/c78WuD7.jpg) You can merge this pull request into a Git repository by running: $ git pull https://github.com/florianschmidt1994/flink flink-8478-add-docs-for-joins Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6407.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6407 commit d2800ff33af179dd32876020d27994b9dc6579aa Author: Florian Schmidt Date: 2018-07-24T16:14:50Z [FLINK-8478] Add documentation page for different datastream joins ---
[GitHub] flink pull request #6406: [FLINK-9159][runtime] Sanity check default timeout...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6406#discussion_r204820593 --- Diff: flink-docs/README.md --- @@ -28,7 +28,7 @@ The `RestAPIDocGenerator` can be used to generate a full reference of the REST A To integrate a new endpoint into the generator 1. Add a new `DocumentingRestEndpoint` class to `RestAPIDocGenerator` that extends the new endpoint class 2. Add another call to `createHtmlFile` in `RestAPIDocGenerator#main` -3. Regenerate the documentation by running `mvn package -Dgenerate-rest-docs -pl flink-docs -am -nsu` +3. Regenerate the documentation by running `mvn package -Dgenerate-rest-docs -pl flink-docs -am -nsu -DskipTests` --- End diff -- ah wait a second, this would run the test for all modules. urgh... ---
[GitHub] flink pull request #6406: [FLINK-9159][runtime] Sanity check default timeout...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6406#discussion_r204819304 --- Diff: flink-docs/README.md --- @@ -28,7 +28,7 @@ The `RestAPIDocGenerator` can be used to generate a full reference of the REST A To integrate a new endpoint into the generator 1. Add a new `DocumentingRestEndpoint` class to `RestAPIDocGenerator` that extends the new endpoint class 2. Add another call to `createHtmlFile` in `RestAPIDocGenerator#main` -3. Regenerate the documentation by running `mvn package -Dgenerate-rest-docs -pl flink-docs -am -nsu` +3. Regenerate the documentation by running `mvn package -Dgenerate-rest-docs -pl flink-docs -am -nsu -DskipTests` --- End diff -- I intentionally left this option out. This guarantees the correctness of the output since it both tests the generator before the generation, and the completeness afterwards. There's a lot of additional options one _could_ add, but I'd prefer if we stick to the _necessary_ ones. ---
[GitHub] flink pull request #6406: [FLINK-9159][runtime] Sanity check default timeout...
GitHub user GJL opened a pull request: https://github.com/apache/flink/pull/6406 [FLINK-9159][runtime] Sanity check default timeout values ## What is the purpose of the change *Set the default timeouts for resource release to sane values. Consolidate config keys and documentation.* ## Brief change log - *Set default value of `mesos.failover-timeout` to 1 week.* - *Deprecate config key `slotmanager.request-timeout`* ## Verifying this change This change added tests and can be verified as follows: - *Added test `SlotManagerConfigurationTest` to verify that slot request timeouts are set correctly.* - *Manually deployed on Mesos 1.5.0.* ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (yes / **no**) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (**yes** / no) - The serializers: (yes / **no** / don't know) - The runtime per-record code paths (performance sensitive): (yes / **no** / don't know) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know) - The S3 file system connector: (yes / **no** / don't know) ## Documentation - Does this pull request introduce a new feature? (yes / **no**) - If yes, how is the feature documented? (**not applicable** / docs / JavaDocs / not documented) cc: @tillrohrmann You can merge this pull request into a Git repository by running: $ git pull https://github.com/GJL/flink FLINK-9159 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6406.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6406 commit 96ef18e322c5623a609382057985b35971ba3234 Author: gyao Date: 2018-07-24T13:38:47Z [FLINK-9159][mesos] Set default value of mesos.failover-timeout to 1 week. commit 112122912d7dd78c612c1648f3e2b041ae65afa6 Author: gyao Date: 2018-07-24T13:48:27Z [FLINK-9159][runtime] Deprecate config key slotmanager.request-timeout - Replace config key slotmanager.request-timeout with slot.request.timeout because both keys have effectively the same semantics. - Rename key slotmanager.taskmanager-timeout to resourcemanager.taskmanager-timeout. commit 787f7c1480a5676e7ce52092265b3cd051064e3c Author: gyao Date: 2018-07-24T13:55:16Z [hotfix][docs] Add -DskipTests flag to command that generates docs. ---
[GitHub] flink pull request #6398: [FLINK-9923][tests] Harden OneInputStreamTaskTest#...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6398 ---
[GitHub] flink pull request #6405: [FLINK-8439] Add Flink shading to AWS credential p...
GitHub user azagrebin opened a pull request: https://github.com/apache/flink/pull/6405 [FLINK-8439] Add Flink shading to AWS credential provider s3 hadoop c⦠## What is the purpose of the change This PR refactors S3 Hadoop and Presto file systems and adds Flink shading to AWS credential provider config. ## Brief change log - extract `AbstractS3FileSystemFactory` base class from `s3hadoop.` and `s3presto.S3FileSystemFactory`s - extract hadoop configuration logic into `HadoopConfigLoader` with Flink shading of certain Hadoop configs - add Flink shading to AWS credential provider config of `S3FileSystemFactory`s ## Verifying this change run unit tests ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): (no) - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: (no) - The serializers: (no) - The runtime per-record code paths (performance sensitive): (no) - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: (no) - The S3 file system connector: (yes) ## Documentation - Does this pull request introduce a new feature? (no) - If yes, how is the feature documented? (not applicable) You can merge this pull request into a Git repository by running: $ git pull https://github.com/azagrebin/flink FLINK-8439 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6405.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6405 commit 44c6eafb6b0757deb89f4e4a7e9bb237f7336428 Author: Andrey Zagrebin Date: 2018-07-23T16:10:55Z [FLINK-8439] Add Flink shading to AWS credential provider s3 hadoop config ---
[GitHub] flink pull request #6400: [FLINK-9931][ui] watermark display bug.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6400#discussion_r204758822 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java --- @@ -28,15 +35,21 @@ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } * * If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value. - * {@code /metrics?get=X,Y} + * {@code /metrics?get=X,Y OR /metrics?get=X,Y&&subtasks=0-4,7-10} * The handler will then return a list containing the values of the requested metrics. * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } * * @deprecated This class is subsumed by {@link SubtaskMetricsHandler} and is only kept for backwards-compatibility. */ @Deprecated public class JobVertexMetricsHandler extends AbstractMetricsHandler { + private final Logger log = LoggerFactory.getLogger(getClass()); --- End diff -- either add this as a protected field to the `AbstractMetricsHandler`, or change it to being static. ---
[GitHub] flink pull request #6400: [FLINK-9931][ui] watermark display bug.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6400#discussion_r204757730 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java --- @@ -57,4 +70,77 @@ public JobVertexMetricsHandler(Executor executor, MetricFetcher fetcher) { ? task.metrics : null; } + + @Override + protected String getRequestMetricsList(Map queryParams) { + String metricRequestsList = queryParams.get(PARAMETER_METRICS); + String subtasksList = queryParams.get(SUB_TASKS); + if (subtasksList == null || subtasksList.isEmpty()) { + return queryParams.get(PARAMETER_METRICS); --- End diff -- return `super.getRequestMetricsList(queryParams)` instead ---
[GitHub] flink pull request #6400: [FLINK-9931][ui] watermark display bug.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6400#discussion_r204759313 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java --- @@ -28,15 +35,21 @@ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } * * If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value. - * {@code /metrics?get=X,Y} + * {@code /metrics?get=X,Y OR /metrics?get=X,Y&&subtasks=0-4,7-10} --- End diff -- should only be a single `&` ---
[GitHub] flink pull request #6388: [FLINK-6222] Allow passing env variables to start ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6388#discussion_r204750473 --- Diff: flink-dist/src/main/flink-bin/bin/config.sh --- @@ -96,6 +96,8 @@ DEFAULT_ENV_JAVA_OPTS=""# Optional JVM args DEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args (JobManager) DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args (TaskManager) DEFAULT_ENV_SSH_OPTS="" # Optional SSH parameters running in cluster mode +DEFAULT_YARN_CONF_DIR=""# YARN Configuration Directory, if necessary --- End diff -- indentation? ---
[GitHub] flink pull request #6393: [FLINK-9296] [table] Add support for non-windowed ...
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6393 ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204720999 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java --- @@ -297,7 +300,13 @@ public void testFailingDataSinkTask() { Configuration stubParams = new Configuration(); super.getTaskConfig().setStubParameters(stubParams); - super.registerFileOutputTask(MockFailingOutputFormat.class, new File(tempTestPath).toURI().toString()); + File tempTestFile = null; + try { + tempTestFile = tempFolder.newFile(tempTestFileName); --- End diff -- same issue as in `JarFileCreatorTest` ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204721547 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java --- @@ -45,33 +46,30 @@ public class DataSourceTaskTest extends TaskTestBase { + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + private static final int MEMORY_MANAGER_SIZE = 1024 * 1024; private static final int NETWORK_BUFFER_SIZE = 1024; private List outList; - - private String tempTestPath = DataSinkTaskTest.constructTestPath(DataSourceTaskTest.class, "dst_test"); - - @After - public void cleanUp() { - File tempTestFile = new File(this.tempTestPath); - if(tempTestFile.exists()) { - tempTestFile.delete(); - } - } + + private final String tempTestFileName = getClass().getName() + "-dst_test"; @Test public void testDataSourceTask() { int keyCnt = 100; int valCnt = 20; this.outList = new ArrayList(); - + File tempTestFile = null; try { - InputFilePreparator.prepareInputFile(new UniformRecordGenerator(keyCnt, valCnt, false), - this.tempTestPath, true); + tempTestFile = tempFolder.newFile(tempTestFileName); + InputFilePreparator.prepareInputFile(new UniformRecordGenerator(keyCnt, valCnt, false), + tempTestFile, true); } catch (IOException e1) { + System.err.println(e1); --- End diff -- let's change the method signature to throw `IOException` and remove the try/catch block. ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204721015 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java --- @@ -337,7 +345,13 @@ public void testFailingSortingDataSinkTask() { super.getTaskConfig().setFilehandlesInput(0, 8); super.getTaskConfig().setSpillingThresholdInput(0, 0.8f); - super.registerFileOutputTask(MockFailingOutputFormat.class, new File(tempTestPath).toURI().toString()); + File tempTestFile = null; + try { + tempTestFile = tempFolder.newFile(tempTestFileName); --- End diff -- same issue as in `JarFileCreatorTest` ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204721035 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java --- @@ -363,7 +376,9 @@ public void testCancelDataSinkTask() throws Exception { Configuration stubParams = new Configuration(); super.getTaskConfig().setStubParameters(stubParams); - super.registerFileOutputTask(MockOutputFormat.class, new File(tempTestPath).toURI().toString()); + File tempTestFile = tempFolder.newFile(tempTestFileName); --- End diff -- same issue as in `JarFileCreatorTest` ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204720914 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java --- @@ -79,12 +75,11 @@ public void testDataSinkTask() { DataSinkTask testTask = new DataSinkTask<>(this.mockEnv); - super.registerFileOutputTask(MockOutputFormat.class, new File(tempTestPath).toURI().toString()); + File tempTestFile = tempFolder.newFile(tempTestFileName); --- End diff -- same issue as in `JarFileCreatorTest` ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204721046 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java --- @@ -419,7 +432,13 @@ public void testCancelSortingDataSinkTask() { super.getTaskConfig().setFilehandlesInput(0, 8); super.getTaskConfig().setSpillingThresholdInput(0, 0.8f); - super.registerFileOutputTask(MockOutputFormat.class, new File(tempTestPath).toURI().toString()); + File tempTestFile = null; + try { + tempTestFile = tempFolder.newFile(tempTestFileName); --- End diff -- same issue as in `JarFileCreatorTest` ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204721390 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java --- @@ -45,33 +46,30 @@ public class DataSourceTaskTest extends TaskTestBase { + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + private static final int MEMORY_MANAGER_SIZE = 1024 * 1024; private static final int NETWORK_BUFFER_SIZE = 1024; private List outList; - - private String tempTestPath = DataSinkTaskTest.constructTestPath(DataSourceTaskTest.class, "dst_test"); - - @After - public void cleanUp() { - File tempTestFile = new File(this.tempTestPath); - if(tempTestFile.exists()) { - tempTestFile.delete(); - } - } + + private final String tempTestFileName = getClass().getName() + "-dst_test"; @Test public void testDataSourceTask() { int keyCnt = 100; int valCnt = 20; this.outList = new ArrayList(); - + File tempTestFile = null; try { - InputFilePreparator.prepareInputFile(new UniformRecordGenerator(keyCnt, valCnt, false), - this.tempTestPath, true); + tempTestFile = tempFolder.newFile(tempTestFileName); --- End diff -- same issue as in `JarFileCreatorTest` ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204720983 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java --- @@ -225,7 +224,13 @@ public void testSortingDataSinkTask() { super.getTaskConfig().setFilehandlesInput(0, 8); super.getTaskConfig().setSpillingThresholdInput(0, 0.8f); - super.registerFileOutputTask(MockOutputFormat.class, new File(tempTestPath).toURI().toString()); + File tempTestFile = null; + try { + tempTestFile = tempFolder.newFile(tempTestFileName); --- End diff -- same issue as in `JarFileCreatorTest` ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204718919 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java --- @@ -481,10 +479,6 @@ public void testCallsForwardedToNonPartitionedBackend() throws Exception { env.getTaskKvStateRegistry()); } - static Environment getMockEnvironment() { - return getMockEnvironment(new File[] { new File(System.getProperty("java.io.tmpdir")) }); - } - static Environment getMockEnvironment(File[] tempDirs) { --- End diff -- we could change this to a vararg method; then we could simplify calls like `getMockEnvironment(new File[] { tempFolder.newFolder() });`. ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204722139 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java --- @@ -240,36 +205,19 @@ public void testOnlyLevel2NestedDirectories() { */ @Test public void testTwoNestedDirectoriesWithFilteredFilesTrue() { - - String sep = System.getProperty("file.separator"); - try { String firstLevelDir = TestFileUtils.randomFileName(); String secondLevelDir = TestFileUtils.randomFileName(); String thirdLevelDir = TestFileUtils.randomFileName(); String secondLevelFilterDir = "_"+TestFileUtils.randomFileName(); String thirdLevelFilterDir = "_"+TestFileUtils.randomFileName(); - File nestedDir = new File(tempPath + sep + firstLevelDir); - nestedDir.mkdirs(); - nestedDir.deleteOnExit(); - - File insideNestedDir = new File(tempPath + sep + firstLevelDir + sep + secondLevelDir); - insideNestedDir.mkdirs(); - insideNestedDir.deleteOnExit(); - File insideNestedDirFiltered = new File(tempPath + sep + firstLevelDir + sep + secondLevelFilterDir); - insideNestedDirFiltered.mkdirs(); - insideNestedDirFiltered.deleteOnExit(); - File filteredFile = new File(tempPath + sep + firstLevelDir + sep + "_IWillBeFiltered"); - filteredFile.createNewFile(); - filteredFile.deleteOnExit(); - - File nestedNestedDir = new File(tempPath + sep + firstLevelDir + sep + secondLevelDir + sep + thirdLevelDir); - nestedNestedDir.mkdirs(); - nestedNestedDir.deleteOnExit(); - File nestedNestedDirFiltered = new File(tempPath + sep + firstLevelDir + sep + secondLevelDir + sep + thirdLevelFilterDir); - nestedNestedDirFiltered.mkdirs(); - nestedNestedDirFiltered.deleteOnExit(); + File nestedNestedDirFiltered = tempFolder.newFolder(firstLevelDir, secondLevelDir, thirdLevelDir, thirdLevelFilterDir); + File nestedNestedDir = nestedNestedDirFiltered.getParentFile(); + File insideNestedDir = nestedNestedDir.getParentFile(); + File nestedDir = insideNestedDir.getParentFile(); + File insideNestedDirFiltered = tempFolder.newFolder(firstLevelDir, secondLevelFilterDir); + new File(nestedDir, "_IWillBeFiltered"); --- End diff -- This file isn't explicitly created anymore. ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204719730 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java --- @@ -127,7 +132,7 @@ public void testSlotAllocation() throws Exception { TestingUtils.infiniteTime()); final File[] taskExecutorLocalStateRootDirs = - new File[]{new File(System.getProperty("java.io.tmpdir"), "localRecovery")}; + new File[]{ tempFolder.newFolder("localRecovery") }; --- End diff -- same issue as in `JarFileCreatorTest` ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204720974 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java --- @@ -141,7 +136,13 @@ public void testUnionDataSinkTask() { DataSinkTask testTask = new DataSinkTask<>(this.mockEnv); - super.registerFileOutputTask(MockOutputFormat.class, new File(tempTestPath).toURI().toString()); + File tempTestFile = null; + try { + tempTestFile = tempFolder.newFile(tempTestFileName); --- End diff -- same issue as in `JarFileCreatorTest` ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204719565 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java --- @@ -218,7 +224,7 @@ public void TestAnonymousClass() throws IOException { @Test public void TestExtendIdentifier() throws IOException { - File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar"); + File out = tempFolder.newFile("jarcreatortest.jar"); --- End diff -- to preserve the existing behavior of passing a file that does not exist, do this instead: ``` File out = new File(tempFolder.getRoot(), "jarcreatortest.jar"); ``` Also applies to the other tests in this file. ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204720758 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java --- @@ -49,22 +50,17 @@ import static org.junit.Assert.assertTrue; public class DataSinkTaskTest extends TaskTestBase { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); private static final Logger LOG = LoggerFactory.getLogger(DataSinkTaskTest.class); private static final int MEMORY_MANAGER_SIZE = 3 * 1024 * 1024; private static final int NETWORK_BUFFER_SIZE = 1024; - private final String tempTestPath = constructTestPath(DataSinkTaskTest.class, "dst_test"); - - @After - public void cleanUp() { - File tempTestFile = new File(this.tempTestPath); - if(tempTestFile.exists()) { - tempTestFile.delete(); - } - } + private final String tempTestFileName = getClass().getName() + "-dst_test"; --- End diff -- since this was only used to prevent name clashes between tests it should now be redundant and can be removed. ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204720065 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java --- @@ -338,22 +279,11 @@ public void testGetStatisticsMultipleNestedFiles() { String secondLevelDir = TestFileUtils.randomFileName(); String secondLevelDir2 = TestFileUtils.randomFileName(); - File nestedDir = new File(tempPath + System.getProperty("file.separator") - + firstLevelDir); - nestedDir.mkdirs(); - nestedDir.deleteOnExit(); + File insideNestedDir = tempFolder.newFolder(firstLevelDir, secondLevelDir); + File insideNestedDir2 = tempFolder.newFolder(firstLevelDir, secondLevelDir2); + File nestedDir = insideNestedDir.getParentFile(); - File insideNestedDir = new File(tempPath + System.getProperty("file.separator") - + firstLevelDir + System.getProperty("file.separator") + secondLevelDir); - insideNestedDir.mkdirs(); - insideNestedDir.deleteOnExit(); - - File insideNestedDir2 = new File(tempPath + System.getProperty("file.separator") - + firstLevelDir + System.getProperty("file.separator") + secondLevelDir2); - insideNestedDir2.mkdirs(); - insideNestedDir2.deleteOnExit(); - - // create a file in the first-level and two files in the nested dir + // create a file in the first-level and two files in the nested dir --- End diff -- revert indentation change ---
[GitHub] flink pull request #6341: [FLINK-5750] Incorrect translation of n-ary Union
Github user asfgit closed the pull request at: https://github.com/apache/flink/pull/6341 ---