[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205106925 --- Diff: flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java --- @@ -66,14 +61,21 @@ public static void main(String[] args) throws Exception { ParameterTool params = ParameterTool.fromArgs(args); int loadFactor = Integer.parseInt(params.getRequired("loadFactor")); String outputPath = params.getRequired("outputPath"); + boolean infinite = params.getBoolean("infinite", false); ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); int numKeys = loadFactor * 128 * 1024; - DataSet> x1Keys = env.createInput(new Generator(numKeys, 1)).setParallelism(4); + DataSet> x1Keys; DataSet> x2Keys = env.createInput(new Generator(numKeys * 32, 2)).setParallelism(4); DataSet> x8Keys = env.createInput(new Generator(numKeys, 8)).setParallelism(4); + if (infinite) { + x1Keys = env.createInput(Generator.infinite()).setParallelism(4).filter(t -> t.f1 >= 0); --- End diff -- This now filters out all data. ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205108213 --- Diff: flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/Generator.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.tests; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * InputFormat that generates a deterministic DataSet of Tuple2(String, Integer) + * + * String: key, can be repeated. + * Integer: uniformly distributed int between 0 and 127 + * + * + * If control path was provided, as long as this file is empty dummy elements with value equal to -1 will be emitted. --- End diff -- outdated ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205108561 --- Diff: flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/Generator.java --- @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.tests; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.util.Preconditions; + +import java.io.IOException; + +/** + * InputFormat that generates a deterministic DataSet of Tuple2(String, Integer) + * + * String: key, can be repeated. + * Integer: uniformly distributed int between 0 and 127 + * + * + * If control path was provided, as long as this file is empty dummy elements with value equal to -1 will be emitted. + */ +public class Generator implements InputFormat, GenericInputSplit> { + + // total number of records + private final long numRecords; + // total number of keys + private final long numKeys; + + // records emitted per partition + private long recordsPerPartition; + // number of keys per partition + private long keysPerPartition; + + // number of currently emitted records + private long recordCnt; + + // id of current partition + private int partitionId; + + private final boolean infinite; + + public Generator(long numKeys, int recordsPerKey) { + this(numKeys, recordsPerKey, false); + } + + private Generator(long numKeys, int recordsPerKey, boolean infinite) { + this.numKeys = numKeys; + this.numRecords = numKeys * recordsPerKey; + this.infinite = infinite; + } + + public static Generator infinite() { + return new Generator(Long.MAX_VALUE, 1, true); + } + + @Override + public void configure(Configuration parameters) { } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { + return null; + } + + @Override + public GenericInputSplit[] createInputSplits(int minNumSplits) { + + GenericInputSplit[] splits = new GenericInputSplit[minNumSplits]; + for (int i = 0; i < minNumSplits; i++) { + splits[i] = new GenericInputSplit(i, minNumSplits); + } + return splits; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(GenericInputSplit[] inputSplits) { + return new DefaultInputSplitAssigner(inputSplits); + } + + @Override + public void open(GenericInputSplit split) throws IOException { + this.partitionId = split.getSplitNumber(); + // total number of partitions + int numPartitions = split.getTotalNumberOfSplits(); + + // ensure even distribution of records and keys + Preconditions.checkArgument( + numRecords % numPartitions == 0, + "Records cannot be evenly distributed among partitions"); + Preconditions.checkArgument( + numKeys % numPartitions == 0, + "Keys cannot be evenly distributed among partitions"); + + this.recordsPerPartition = numRecords / numPartitions; + this.keysPerPartition = numKeys / numPartitions; + + this.recordCnt = 0; + } + + @Override + public boolean reache
[GitHub] flink pull request #6407: [FLINK-9877][docs] Add documentation page for diff...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6407#discussion_r205097411 --- Diff: docs/dev/stream/operators/joining.md --- @@ -0,0 +1,286 @@ +--- +title: "Joining" +nav-id: streaming_joins +nav-show_overview: true +nav-parent_id: streaming +nav-pos: 10 +--- + + +* toc +{:toc} + +# Window Join +A window join will join the elements of two streams that share a common key and lie in the same window. These windows can be defined by using a [window assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) and are evaluated on a union of both streams. This is especially important for session window joins, which will be demonstrated below. + +The joined elements are then passed to a user-defined `JoinFunction` or `FlatJoinFunction` where the user can perform transformations on the joined elements. + +The general usage always looks like the followning: + +```java +stream.join(otherStream) +.where() +.equalTo() +.window() +.apply() +``` + +Some notes on semantics: +- The creation of pairwise combinations of elements of the two streams behaves like an inner-join, meaning elements from one stream will not be emitted if they don't have a corresponding element from the other stream to be joined with. +- Those elements that do get joined will have as their timestamp the largest timestamp that still lies in the respective window. For example a window with `[5, 10)` as its boundaries would result in the joined elements having nine as their timestamp. + +In the following section we are going to give an overview over how different kinds of windows can be used for a window join and what the results of those joins would look like using examplary scenarios. + +## Tumbling Window +When performing a tumbling window join, all elements with a common key and a common tumbling window are joined as pairwise combinations and passed on to the user-defined function. Because this behaves like an inner join, elements of one stream that do not have elements from another stream in their tumbling window are not emitted! + +### Example + + +In our example we are defining a tumbling window with the size of 2 milliseconds, which results in windows of the form `[0,1], [2,3], ...`. The image shows the pairwise combinations of all elements in each window which will be passed on to the user-defined function. You can also see how in the tumbling window `[6,7]` nothing is emitted because no elements from the green stream exist to be joined with the orange elements ⥠and â¦. + + + + +```java --- End diff -- please use { % highlight java % } syntax ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205087147 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { +if [ ${CLEARED} -eq 0 ]; then + +if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then +echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}" +kill ${JM_WATCHDOG_PID} 2> /dev/null +wait ${JM_WATCHDOG_PID} 2> /dev/null +fi + +CLEARED=1 +fi +} + +function verify_logs() { +local OUTPUT=$FLINK_DIR/log/*.out +local JM_FAILURES=$1 +local EXIT_CODE=0 + +# verify that we have no alerts +if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then +echo "FAILURE: Alerts found at the general purpose DataSet job." +EXIT_CODE=1 +fi + +# checks that all apart from the first JM recover the failed jobgraph. +if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then +echo "FAILURE: A JM did not take over." +EXIT_CODE=1 +fi + +if [[ $EXIT_CODE != 0 ]]; then +echo "One or more tests FAILED." +exit $EXIT_CODE +fi +} + +function jm_watchdog() { +local EXPECTED_JMS=$1 +local IP_PORT=$2 + +while true; do +local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`; +local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS)) +for (( c=0; c> ${TEST_DATA_DIR}/control/test.txt --- End diff -- An infinite flag should be sufficient. ---
[GitHub] flink issue #6378: [FLINK-9236] [pom] upgrade the version of apache parent p...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6378 merging. ---
[GitHub] flink issue #6402: [FLINK-9914][docs] Update Docker docs
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6402 merging. ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205081350 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { +if [ ${CLEARED} -eq 0 ]; then + +if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then +echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}" +kill ${JM_WATCHDOG_PID} 2> /dev/null +wait ${JM_WATCHDOG_PID} 2> /dev/null +fi + +CLEARED=1 +fi +} + +function verify_logs() { +local OUTPUT=$FLINK_DIR/log/*.out +local JM_FAILURES=$1 +local EXIT_CODE=0 + +# verify that we have no alerts +if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then +echo "FAILURE: Alerts found at the general purpose DataSet job." +EXIT_CODE=1 +fi + +# checks that all apart from the first JM recover the failed jobgraph. +if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then +echo "FAILURE: A JM did not take over." +EXIT_CODE=1 +fi + +if [[ $EXIT_CODE != 0 ]]; then +echo "One or more tests FAILED." +exit $EXIT_CODE +fi +} + +function jm_watchdog() { +local EXPECTED_JMS=$1 +local IP_PORT=$2 + +while true; do +local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`; +local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS)) +for (( c=0; c> ${TEST_DATA_DIR}/control/test.txt --- End diff -- But the loop executes `wait_job_running ${JOB_ID}`, so don't we know it _actually_ restarted? ---
[GitHub] flink issue #6409: [FLINK-9899][Kinesis Connecotr] Add comprehensive per-sha...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6409 New metrics should be documented in https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#kinesis-connectors. ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205078058 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { +if [ ${CLEARED} -eq 0 ]; then + +if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then +echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}" +kill ${JM_WATCHDOG_PID} 2> /dev/null +wait ${JM_WATCHDOG_PID} 2> /dev/null +fi + +CLEARED=1 +fi +} + +function verify_logs() { +local OUTPUT=$FLINK_DIR/log/*.out +local JM_FAILURES=$1 +local EXIT_CODE=0 + +# verify that we have no alerts +if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then +echo "FAILURE: Alerts found at the general purpose DataSet job." +EXIT_CODE=1 +fi + +# checks that all apart from the first JM recover the failed jobgraph. +if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then +echo "FAILURE: A JM did not take over." +EXIT_CODE=1 +fi + +if [[ $EXIT_CODE != 0 ]]; then +echo "One or more tests FAILED." +exit $EXIT_CODE +fi +} + +function jm_watchdog() { +local EXPECTED_JMS=$1 +local IP_PORT=$2 + +while true; do +local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`; +local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS)) +for (( c=0; c> ${TEST_DATA_DIR}/control/test.txt --- End diff -- We've already verified that the job is properly restarted in the jobmanager-kill loop, so this should only be about shutting down the job. ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205069073 --- Diff: flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java --- @@ -66,14 +59,21 @@ public static void main(String[] args) throws Exception { ParameterTool params = ParameterTool.fromArgs(args); int loadFactor = Integer.parseInt(params.getRequired("loadFactor")); String outputPath = params.getRequired("outputPath"); + String source = params.get("source", null); --- End diff -- source parameter is not documented in the javadocs ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205070681 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { --- End diff -- same as below ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205070467 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { +if [ ${CLEARED} -eq 0 ]; then + +if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then +echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}" +kill ${JM_WATCHDOG_PID} 2> /dev/null +wait ${JM_WATCHDOG_PID} 2> /dev/null +fi + +CLEARED=1 +fi +} + +function verify_logs() { +local OUTPUT=$FLINK_DIR/log/*.out +local JM_FAILURES=$1 +local EXIT_CODE=0 + +# verify that we have no alerts +if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then +echo "FAILURE: Alerts found at the general purpose DataSet job." +EXIT_CODE=1 +fi + +# checks that all apart from the first JM recover the failed jobgraph. +if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then +echo "FAILURE: A JM did not take over." +EXIT_CODE=1 +fi + +if [[ $EXIT_CODE != 0 ]]; then +echo "One or more tests FAILED." +exit $EXIT_CODE +fi +} + +function jm_watchdog() { +local EXPECTED_JMS=$1 +local IP_PORT=$2 + +while true; do +local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`; +local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS)) +for (( c=0; c
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205070430 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { +if [ ${CLEARED} -eq 0 ]; then + +if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then +echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}" +kill ${JM_WATCHDOG_PID} 2> /dev/null +wait ${JM_WATCHDOG_PID} 2> /dev/null +fi + +CLEARED=1 +fi +} + +function verify_logs() { +local OUTPUT=$FLINK_DIR/log/*.out +local JM_FAILURES=$1 +local EXIT_CODE=0 + +# verify that we have no alerts +if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then +echo "FAILURE: Alerts found at the general purpose DataSet job." +EXIT_CODE=1 +fi + +# checks that all apart from the first JM recover the failed jobgraph. +if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then +echo "FAILURE: A JM did not take over." +EXIT_CODE=1 +fi + +if [[ $EXIT_CODE != 0 ]]; then +echo "One or more tests FAILED." +exit $EXIT_CODE +fi +} + +function jm_watchdog() { --- End diff -- move into shared `common-ha.sh` ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205072007 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { +if [ ${CLEARED} -eq 0 ]; then + +if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then +echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}" +kill ${JM_WATCHDOG_PID} 2> /dev/null +wait ${JM_WATCHDOG_PID} 2> /dev/null +fi + +CLEARED=1 +fi +} + +function verify_logs() { +local OUTPUT=$FLINK_DIR/log/*.out +local JM_FAILURES=$1 +local EXIT_CODE=0 + +# verify that we have no alerts +if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then +echo "FAILURE: Alerts found at the general purpose DataSet job." +EXIT_CODE=1 +fi + +# checks that all apart from the first JM recover the failed jobgraph. +if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then +echo "FAILURE: A JM did not take over." +EXIT_CODE=1 +fi + +if [[ $EXIT_CODE != 0 ]]; then +echo "One or more tests FAILED." +exit $EXIT_CODE +fi +} + +function jm_watchdog() { +local EXPECTED_JMS=$1 +local IP_PORT=$2 + +while true; do +local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`; +local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS)) +for (( c=0; c> ${TEST_DATA_DIR}/control/test.txt --- End diff -- Couldn't we simply cancel the job? ---
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205070024 --- Diff: flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/Generator.java --- @@ -0,0 +1,166 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.batch.tests; + +import org.apache.flink.api.common.io.DefaultInputSplitAssigner; +import org.apache.flink.api.common.io.InputFormat; +import org.apache.flink.api.common.io.statistics.BaseStatistics; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.io.GenericInputSplit; +import org.apache.flink.core.io.InputSplitAssigner; +import org.apache.flink.util.FileUtils; +import org.apache.flink.util.Preconditions; + +import java.io.File; +import java.io.IOException; + +/** + * InputFormat that generates a deterministic DataSet of Tuple2(String, Integer) + * + * String: key, can be repeated. + * Integer: uniformly distributed int between 0 and 127 + * + * + * If control path was provided, as long as this file is empty dummy elements with value equal to -1 will be emitted. + */ +public class Generator implements InputFormat, GenericInputSplit> { + + // total number of records + private final long numRecords; + // total number of keys + private final long numKeys; + + // records emitted per partition + private long recordsPerPartition; + // number of keys per partition + private long keysPerPartition; + + // number of currently emitted records + private long recordCnt; + + // id of current partition + private int partitionId; + + private final String path; + + private boolean hasStarted = true; + + public Generator(long numKeys, int recordsPerKey) { + this(numKeys, recordsPerKey, null); + } + + public Generator(long numKeys, int recordsPerKey, String controlPath) { + this.numKeys = numKeys; + this.numRecords = numKeys * recordsPerKey; + this.path = controlPath; + } + + @Override + public void configure(Configuration parameters) { } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStatistics) { + return null; + } + + @Override + public GenericInputSplit[] createInputSplits(int minNumSplits) { + + GenericInputSplit[] splits = new GenericInputSplit[minNumSplits]; + for (int i = 0; i < minNumSplits; i++) { + splits[i] = new GenericInputSplit(i, minNumSplits); + } + return splits; + } + + @Override + public InputSplitAssigner getInputSplitAssigner(GenericInputSplit[] inputSplits) { + return new DefaultInputSplitAssigner(inputSplits); + } + + @Override + public void open(GenericInputSplit split) throws IOException { + this.partitionId = split.getSplitNumber(); + // total number of partitions + int numPartitions = split.getTotalNumberOfSplits(); + + // ensure even distribution of records and keys + Preconditions.checkArgument( + numRecords % numPartitions == 0, + "Records cannot be evenly distributed among partitions"); + Preconditions.checkArgument( + numKeys % numPartitions == 0, + "Keys cannot be evenly distributed among partitions"); + + this.recordsPerPartition = numRecords / numPartitions; + this.keysPerPartition = numKeys / numPartitions; + + this.recordCnt = 0; + + if (this.path != null) {
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205071341 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { +if [ ${CLEARED} -eq 0 ]; then + +if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then +echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}" +kill ${JM_WATCHDOG_PID} 2> /dev/null +wait ${JM_WATCHDOG_PID} 2> /dev/null +fi + +CLEARED=1 +fi +} + +function verify_logs() { +local OUTPUT=$FLINK_DIR/log/*.out +local JM_FAILURES=$1 +local EXIT_CODE=0 + +# verify that we have no alerts +if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then +echo "FAILURE: Alerts found at the general purpose DataSet job." +EXIT_CODE=1 +fi + +# checks that all apart from the first JM recover the failed jobgraph. +if ! [ `grep -r --include '*standalonesession*.log' 'Recovered SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq ${JM_FAILURES} ]; then +echo "FAILURE: A JM did not take over." +EXIT_CODE=1 +fi + +if [[ $EXIT_CODE != 0 ]]; then +echo "One or more tests FAILED." +exit $EXIT_CODE +fi +} + +function jm_watchdog() { +local EXPECTED_JMS=$1 +local IP_PORT=$2 + +while true; do +local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' | wc -l`; +local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS)) +for (( c=0; c
[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6415#discussion_r205070634 --- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh --- @@ -0,0 +1,139 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +source "$(dirname "$0")"/common.sh + +TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar + +JM_WATCHDOG_PID=0 + +# flag indicating if we have already cleared up things after a test +CLEARED=0 + +function stop_cluster_and_watchdog() { +if [ ${CLEARED} -eq 0 ]; then + +if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then +echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}" +kill ${JM_WATCHDOG_PID} 2> /dev/null +wait ${JM_WATCHDOG_PID} 2> /dev/null +fi + +CLEARED=1 +fi +} + +function verify_logs() { --- End diff -- move into shared `common-ha.sh`, parameterized the strings to search for so it's also applicable for the streaming test. ---
[GitHub] flink pull request #6416: [FLINK-9942][rest] Guard handlers against null fie...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6416 [FLINK-9942][rest] Guard handlers against null fields ## What is the purpose of the change This PR fixes prevents some NPEs that could arise if fields in the request are set to null or are omitted. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 9942 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6416.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6416 commit 3e9046864d566ef6aa0d9af7a09771ebb5fe556b Author: zentol Date: 2018-07-25T06:39:29Z [FLINK-9942][rest] Guard handlers against null fields ---
[GitHub] flink pull request #6406: [FLINK-9159][runtime] Sanity check default timeout...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6406#discussion_r204820593 --- Diff: flink-docs/README.md --- @@ -28,7 +28,7 @@ The `RestAPIDocGenerator` can be used to generate a full reference of the REST A To integrate a new endpoint into the generator 1. Add a new `DocumentingRestEndpoint` class to `RestAPIDocGenerator` that extends the new endpoint class 2. Add another call to `createHtmlFile` in `RestAPIDocGenerator#main` -3. Regenerate the documentation by running `mvn package -Dgenerate-rest-docs -pl flink-docs -am -nsu` +3. Regenerate the documentation by running `mvn package -Dgenerate-rest-docs -pl flink-docs -am -nsu -DskipTests` --- End diff -- ah wait a second, this would run the test for all modules. urgh... ---
[GitHub] flink issue #6396: [FLINK-9806][docs] Add canonical link element to docs
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6396 we can keep the hotfix in this PR but it shouldn't be squashed into the main commit as it is unrelated. ---
[GitHub] flink pull request #6406: [FLINK-9159][runtime] Sanity check default timeout...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6406#discussion_r204819304 --- Diff: flink-docs/README.md --- @@ -28,7 +28,7 @@ The `RestAPIDocGenerator` can be used to generate a full reference of the REST A To integrate a new endpoint into the generator 1. Add a new `DocumentingRestEndpoint` class to `RestAPIDocGenerator` that extends the new endpoint class 2. Add another call to `createHtmlFile` in `RestAPIDocGenerator#main` -3. Regenerate the documentation by running `mvn package -Dgenerate-rest-docs -pl flink-docs -am -nsu` +3. Regenerate the documentation by running `mvn package -Dgenerate-rest-docs -pl flink-docs -am -nsu -DskipTests` --- End diff -- I intentionally left this option out. This guarantees the correctness of the output since it both tests the generator before the generation, and the completeness afterwards. There's a lot of additional options one _could_ add, but I'd prefer if we stick to the _necessary_ ones. ---
[GitHub] flink pull request #6400: [FLINK-9931][ui] watermark display bug.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6400#discussion_r204758822 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java --- @@ -28,15 +35,21 @@ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } * * If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value. - * {@code /metrics?get=X,Y} + * {@code /metrics?get=X,Y OR /metrics?get=X,Y&=0-4,7-10} * The handler will then return a list containing the values of the requested metrics. * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] } * * @deprecated This class is subsumed by {@link SubtaskMetricsHandler} and is only kept for backwards-compatibility. */ @Deprecated public class JobVertexMetricsHandler extends AbstractMetricsHandler { + private final Logger log = LoggerFactory.getLogger(getClass()); --- End diff -- either add this as a protected field to the `AbstractMetricsHandler`, or change it to being static. ---
[GitHub] flink pull request #6400: [FLINK-9931][ui] watermark display bug.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6400#discussion_r204757730 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java --- @@ -57,4 +70,77 @@ public JobVertexMetricsHandler(Executor executor, MetricFetcher fetcher) { ? task.metrics : null; } + + @Override + protected String getRequestMetricsList(Map queryParams) { + String metricRequestsList = queryParams.get(PARAMETER_METRICS); + String subtasksList = queryParams.get(SUB_TASKS); + if (subtasksList == null || subtasksList.isEmpty()) { + return queryParams.get(PARAMETER_METRICS); --- End diff -- return `super.getRequestMetricsList(queryParams)` instead ---
[GitHub] flink pull request #6400: [FLINK-9931][ui] watermark display bug.
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6400#discussion_r204759313 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java --- @@ -28,15 +35,21 @@ * {@code {"available": [ { "name" : "X", "id" : "X" } ] } } * * If the query parameters do contain a "get" parameter, a comma-separated list of metric names is expected as a value. - * {@code /metrics?get=X,Y} + * {@code /metrics?get=X,Y OR /metrics?get=X,Y&=0-4,7-10} --- End diff -- should only be a single `&` ---
[GitHub] flink pull request #6388: [FLINK-6222] Allow passing env variables to start ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6388#discussion_r204750473 --- Diff: flink-dist/src/main/flink-bin/bin/config.sh --- @@ -96,6 +96,8 @@ DEFAULT_ENV_JAVA_OPTS=""# Optional JVM args DEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args (JobManager) DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args (TaskManager) DEFAULT_ENV_SSH_OPTS="" # Optional SSH parameters running in cluster mode +DEFAULT_YARN_CONF_DIR=""# YARN Configuration Directory, if necessary --- End diff -- indentation? ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204720999 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java --- @@ -297,7 +300,13 @@ public void testFailingDataSinkTask() { Configuration stubParams = new Configuration(); super.getTaskConfig().setStubParameters(stubParams); - super.registerFileOutputTask(MockFailingOutputFormat.class, new File(tempTestPath).toURI().toString()); + File tempTestFile = null; + try { + tempTestFile = tempFolder.newFile(tempTestFileName); --- End diff -- same issue as in `JarFileCreatorTest` ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204721547 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java --- @@ -45,33 +46,30 @@ public class DataSourceTaskTest extends TaskTestBase { + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + private static final int MEMORY_MANAGER_SIZE = 1024 * 1024; private static final int NETWORK_BUFFER_SIZE = 1024; private List outList; - - private String tempTestPath = DataSinkTaskTest.constructTestPath(DataSourceTaskTest.class, "dst_test"); - - @After - public void cleanUp() { - File tempTestFile = new File(this.tempTestPath); - if(tempTestFile.exists()) { - tempTestFile.delete(); - } - } + + private final String tempTestFileName = getClass().getName() + "-dst_test"; @Test public void testDataSourceTask() { int keyCnt = 100; int valCnt = 20; this.outList = new ArrayList(); - + File tempTestFile = null; try { - InputFilePreparator.prepareInputFile(new UniformRecordGenerator(keyCnt, valCnt, false), - this.tempTestPath, true); + tempTestFile = tempFolder.newFile(tempTestFileName); + InputFilePreparator.prepareInputFile(new UniformRecordGenerator(keyCnt, valCnt, false), + tempTestFile, true); } catch (IOException e1) { + System.err.println(e1); --- End diff -- let's change the method signature to throw `IOException` and remove the try/catch block. ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204721015 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java --- @@ -337,7 +345,13 @@ public void testFailingSortingDataSinkTask() { super.getTaskConfig().setFilehandlesInput(0, 8); super.getTaskConfig().setSpillingThresholdInput(0, 0.8f); - super.registerFileOutputTask(MockFailingOutputFormat.class, new File(tempTestPath).toURI().toString()); + File tempTestFile = null; + try { + tempTestFile = tempFolder.newFile(tempTestFileName); --- End diff -- same issue as in `JarFileCreatorTest` ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204721035 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java --- @@ -363,7 +376,9 @@ public void testCancelDataSinkTask() throws Exception { Configuration stubParams = new Configuration(); super.getTaskConfig().setStubParameters(stubParams); - super.registerFileOutputTask(MockOutputFormat.class, new File(tempTestPath).toURI().toString()); + File tempTestFile = tempFolder.newFile(tempTestFileName); --- End diff -- same issue as in `JarFileCreatorTest` ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204720914 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java --- @@ -79,12 +75,11 @@ public void testDataSinkTask() { DataSinkTask testTask = new DataSinkTask<>(this.mockEnv); - super.registerFileOutputTask(MockOutputFormat.class, new File(tempTestPath).toURI().toString()); + File tempTestFile = tempFolder.newFile(tempTestFileName); --- End diff -- same issue as in `JarFileCreatorTest` ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204721046 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java --- @@ -419,7 +432,13 @@ public void testCancelSortingDataSinkTask() { super.getTaskConfig().setFilehandlesInput(0, 8); super.getTaskConfig().setSpillingThresholdInput(0, 0.8f); - super.registerFileOutputTask(MockOutputFormat.class, new File(tempTestPath).toURI().toString()); + File tempTestFile = null; + try { + tempTestFile = tempFolder.newFile(tempTestFileName); --- End diff -- same issue as in `JarFileCreatorTest` ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204721390 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java --- @@ -45,33 +46,30 @@ public class DataSourceTaskTest extends TaskTestBase { + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); + private static final int MEMORY_MANAGER_SIZE = 1024 * 1024; private static final int NETWORK_BUFFER_SIZE = 1024; private List outList; - - private String tempTestPath = DataSinkTaskTest.constructTestPath(DataSourceTaskTest.class, "dst_test"); - - @After - public void cleanUp() { - File tempTestFile = new File(this.tempTestPath); - if(tempTestFile.exists()) { - tempTestFile.delete(); - } - } + + private final String tempTestFileName = getClass().getName() + "-dst_test"; @Test public void testDataSourceTask() { int keyCnt = 100; int valCnt = 20; this.outList = new ArrayList(); - + File tempTestFile = null; try { - InputFilePreparator.prepareInputFile(new UniformRecordGenerator(keyCnt, valCnt, false), - this.tempTestPath, true); + tempTestFile = tempFolder.newFile(tempTestFileName); --- End diff -- same issue as in `JarFileCreatorTest` ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204720983 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java --- @@ -225,7 +224,13 @@ public void testSortingDataSinkTask() { super.getTaskConfig().setFilehandlesInput(0, 8); super.getTaskConfig().setSpillingThresholdInput(0, 0.8f); - super.registerFileOutputTask(MockOutputFormat.class, new File(tempTestPath).toURI().toString()); + File tempTestFile = null; + try { + tempTestFile = tempFolder.newFile(tempTestFileName); --- End diff -- same issue as in `JarFileCreatorTest` ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204718919 --- Diff: flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java --- @@ -481,10 +479,6 @@ public void testCallsForwardedToNonPartitionedBackend() throws Exception { env.getTaskKvStateRegistry()); } - static Environment getMockEnvironment() { - return getMockEnvironment(new File[] { new File(System.getProperty("java.io.tmpdir")) }); - } - static Environment getMockEnvironment(File[] tempDirs) { --- End diff -- we could change this to a vararg method; then we could simplify calls like `getMockEnvironment(new File[] { tempFolder.newFolder() });`. ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204722139 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java --- @@ -240,36 +205,19 @@ public void testOnlyLevel2NestedDirectories() { */ @Test public void testTwoNestedDirectoriesWithFilteredFilesTrue() { - - String sep = System.getProperty("file.separator"); - try { String firstLevelDir = TestFileUtils.randomFileName(); String secondLevelDir = TestFileUtils.randomFileName(); String thirdLevelDir = TestFileUtils.randomFileName(); String secondLevelFilterDir = "_"+TestFileUtils.randomFileName(); String thirdLevelFilterDir = "_"+TestFileUtils.randomFileName(); - File nestedDir = new File(tempPath + sep + firstLevelDir); - nestedDir.mkdirs(); - nestedDir.deleteOnExit(); - - File insideNestedDir = new File(tempPath + sep + firstLevelDir + sep + secondLevelDir); - insideNestedDir.mkdirs(); - insideNestedDir.deleteOnExit(); - File insideNestedDirFiltered = new File(tempPath + sep + firstLevelDir + sep + secondLevelFilterDir); - insideNestedDirFiltered.mkdirs(); - insideNestedDirFiltered.deleteOnExit(); - File filteredFile = new File(tempPath + sep + firstLevelDir + sep + "_IWillBeFiltered"); - filteredFile.createNewFile(); - filteredFile.deleteOnExit(); - - File nestedNestedDir = new File(tempPath + sep + firstLevelDir + sep + secondLevelDir + sep + thirdLevelDir); - nestedNestedDir.mkdirs(); - nestedNestedDir.deleteOnExit(); - File nestedNestedDirFiltered = new File(tempPath + sep + firstLevelDir + sep + secondLevelDir + sep + thirdLevelFilterDir); - nestedNestedDirFiltered.mkdirs(); - nestedNestedDirFiltered.deleteOnExit(); + File nestedNestedDirFiltered = tempFolder.newFolder(firstLevelDir, secondLevelDir, thirdLevelDir, thirdLevelFilterDir); + File nestedNestedDir = nestedNestedDirFiltered.getParentFile(); + File insideNestedDir = nestedNestedDir.getParentFile(); + File nestedDir = insideNestedDir.getParentFile(); + File insideNestedDirFiltered = tempFolder.newFolder(firstLevelDir, secondLevelFilterDir); + new File(nestedDir, "_IWillBeFiltered"); --- End diff -- This file isn't explicitly created anymore. ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204719730 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java --- @@ -127,7 +132,7 @@ public void testSlotAllocation() throws Exception { TestingUtils.infiniteTime()); final File[] taskExecutorLocalStateRootDirs = - new File[]{new File(System.getProperty("java.io.tmpdir"), "localRecovery")}; + new File[]{ tempFolder.newFolder("localRecovery") }; --- End diff -- same issue as in `JarFileCreatorTest` ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204720974 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java --- @@ -141,7 +136,13 @@ public void testUnionDataSinkTask() { DataSinkTask testTask = new DataSinkTask<>(this.mockEnv); - super.registerFileOutputTask(MockOutputFormat.class, new File(tempTestPath).toURI().toString()); + File tempTestFile = null; + try { + tempTestFile = tempFolder.newFile(tempTestFileName); --- End diff -- same issue as in `JarFileCreatorTest` ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204719565 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java --- @@ -218,7 +224,7 @@ public void TestAnonymousClass() throws IOException { @Test public void TestExtendIdentifier() throws IOException { - File out = new File(System.getProperty("java.io.tmpdir"), "jarcreatortest.jar"); + File out = tempFolder.newFile("jarcreatortest.jar"); --- End diff -- to preserve the existing behavior of passing a file that does not exist, do this instead: ``` File out = new File(tempFolder.getRoot(), "jarcreatortest.jar"); ``` Also applies to the other tests in this file. ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204720758 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java --- @@ -49,22 +50,17 @@ import static org.junit.Assert.assertTrue; public class DataSinkTaskTest extends TaskTestBase { + + @Rule + public TemporaryFolder tempFolder = new TemporaryFolder(); private static final Logger LOG = LoggerFactory.getLogger(DataSinkTaskTest.class); private static final int MEMORY_MANAGER_SIZE = 3 * 1024 * 1024; private static final int NETWORK_BUFFER_SIZE = 1024; - private final String tempTestPath = constructTestPath(DataSinkTaskTest.class, "dst_test"); - - @After - public void cleanUp() { - File tempTestFile = new File(this.tempTestPath); - if(tempTestFile.exists()) { - tempTestFile.delete(); - } - } + private final String tempTestFileName = getClass().getName() + "-dst_test"; --- End diff -- since this was only used to prevent name clashes between tests it should now be redundant and can be removed. ---
[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6399#discussion_r204720065 --- Diff: flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java --- @@ -338,22 +279,11 @@ public void testGetStatisticsMultipleNestedFiles() { String secondLevelDir = TestFileUtils.randomFileName(); String secondLevelDir2 = TestFileUtils.randomFileName(); - File nestedDir = new File(tempPath + System.getProperty("file.separator") - + firstLevelDir); - nestedDir.mkdirs(); - nestedDir.deleteOnExit(); + File insideNestedDir = tempFolder.newFolder(firstLevelDir, secondLevelDir); + File insideNestedDir2 = tempFolder.newFolder(firstLevelDir, secondLevelDir2); + File nestedDir = insideNestedDir.getParentFile(); - File insideNestedDir = new File(tempPath + System.getProperty("file.separator") - + firstLevelDir + System.getProperty("file.separator") + secondLevelDir); - insideNestedDir.mkdirs(); - insideNestedDir.deleteOnExit(); - - File insideNestedDir2 = new File(tempPath + System.getProperty("file.separator") - + firstLevelDir + System.getProperty("file.separator") + secondLevelDir2); - insideNestedDir2.mkdirs(); - insideNestedDir2.deleteOnExit(); - - // create a file in the first-level and two files in the nested dir + // create a file in the first-level and two files in the nested dir --- End diff -- revert indentation change ---
[GitHub] flink issue #6398: [FLINK-9923][tests] Harden OneInputStreamTaskTest#testWat...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6398 well, that's a fairly general statement isn't it :/ Unless we find any mention anywhere of a major JVM not adhering to the JLS in this regard, that is suited for Flink applications, I would go with a volatile long. Personally i never follow inspections at face value; we don't know how up-to-date this inspections or what information it is based on, which JVMs it applies to as it doesn't mention even a single example, and as with all JVM-/environment-specific inspections the applicability is questionable. ---
[GitHub] flink issue #6398: [FLINK-9923][tests] Harden OneInputStreamTaskTest#testWat...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6398 Do you have a source for volatile longs not being supported in all JVM implementations? I'm asking since the (expected) behavior is defined in the [Java Language Specification](https://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html#jls-17.7). ---
[GitHub] flink issue #6388: [FLINK-6222] Allow passing env variables to start scripts...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6388 If a feature isn't visibly documented chances are no one will use it ;) I'm not sure if the configuration page is the right place to put it, as it so far deals exclusively with settings set in `flink-conf.yaml`. Most notable this line in the introduction sticks out: ``` All configuration is done in conf/flink-conf.yaml, which is expected to be a flat collection of YAML key value pairs with format key: value. ``` You could name it `flink-client-env-sh`, that would make it make it more obvious that it only applies to the client. However i have to ask, why a separate file in the first place? We already have config options for setting environment variables (`env.java.opts`); couldn't we introduce a separate option for clients? ---
[GitHub] flink pull request #6395: [FLINK-9900][tests] Harden ZooKeeperHighAvailabili...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6395 [FLINK-9900][tests] Harden ZooKeeperHighAvailabilityITCase ## What is the purpose of the change This PR makes a few modifications to the `ZooKeeperHighAvailabilityITCase` to reduce the chances for intermittent test failures and timeouts. Changes: ## 1) The test was moving files out of the HA storage directory with a simple loop using `File#renameTo`. The test enforced that the moving is successful, however since old checkpoints may be deleted asynchronously this may not always be the case. We now use a `FileVisitor` and ignore `IOExceptions` that occur while moving. If no checkpoint file could be moved the test will still fail. ## 2) After the checkpoint files were moved out of the HA storage directory the job is thrown into a restart loop. To verify the restart behavior the test was polling the job state and checked for the `RESTARTING` and `FAILING` states. Due to the small size the job is in these states only for a short time, effectively adding a race condition. Thus this loop mayrun for longer than anticipated; the largest outlier i got locally was 50 seconds which isn't _that_ for off from the 2 minute timeout. I suspect this to be the failure cause raised in the JIRA, but I can't guarantee it. Instead we now access the `fullRestarts` metric using a custom reporter to check how many restarts have occurred. The actual _state transitions_ should be irrelevant to the test. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 9900 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6395.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6395 commit b8827dc3723558c52ad567bf88f24ae34129ea08 Author: zentol Date: 2018-07-23T14:21:32Z [FLINK-9900][tests] Harden ZooKeeperHighAvailabilityITCase ---
[GitHub] flink issue #6378: [FLINK-9236] [pom] upgrade the version of apache parent p...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6378 @zhangminglei @YCjia Please do not push changes for the sole purpose of re-triggering Travis. We already know that they passed (due to the travis fork), and realistically there's no way for the parent pom to influence this test. ---
[GitHub] flink pull request #6372: [Flink 9353] Tests running per job standalone clus...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6372#discussion_r204018623 --- Diff: flink-end-to-end-tests/run-pre-commit-tests.sh --- @@ -51,6 +51,7 @@ run_test "Shaded Hadoop S3A end-to-end test" "$END_TO_END_DIR/test-scripts/test_ run_test "Shaded Presto S3 end-to-end test" "$END_TO_END_DIR/test-scripts/test_shaded_presto_s3.sh" run_test "Hadoop-free Wordcount end-to-end test" "$END_TO_END_DIR/test-scripts/test_hadoop_free.sh" run_test "Distributed cache end-to-end test" "$END_TO_END_DIR/test-scripts/test_streaming_distributed_cache_via_blob.sh" +run_test "Wordcount end-to-end test in docker env" "$END_TO_END_DIR/test-scripts/test_docker_embedded_job.sh" --- End diff -- if we can't run it on travis it shouldn't be in this script? ---
[GitHub] flink pull request #6377: [FLINK-8981] Add end-to-end test for running on YA...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6377#discussion_r203998139 --- Diff: flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/Dockerfile --- @@ -0,0 +1,159 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# This image is modified version of Knappek/docker-hadoop-secure +# * Knappek/docker-hadoop-secure <https://github.com/Knappek/docker-hadoop-secure> +# +# With bits and pieces added from Lewuathe/docker-hadoop-cluster to extend it to start a proper kerberized Hadoop cluster: +# * Lewuathe/docker-hadoop-cluster <https://github.com/Lewuathe/docker-hadoop-cluster> +# +# Author: Aljoscha Krettek +# Date: 2018 May, 15 +# +# Creates multi-node, kerberized Hadoop cluster on Docker + +FROM sequenceiq/pam:ubuntu-14.04 +MAINTAINER aljoscha + +USER root + +RUN addgroup hadoop +RUN useradd -d /home/hdfs -ms /bin/bash -G hadoop -p hdfs hdfs +RUN useradd -d /home/yarn -ms /bin/bash -G hadoop -p yarn yarn +RUN useradd -d /home/mapred -ms /bin/bash -G hadoop -p mapred mapred + +RUN useradd -d /home/hadoop-user -ms /bin/bash -p hadoop-user hadoop-user + +# install dev tools +RUN apt-get update +RUN apt-get install -y curl tar sudo openssh-server openssh-client rsync unzip + +# Kerberos client +RUN apt-get install krb5-user -y +RUN mkdir -p /var/log/kerberos +RUN touch /var/log/kerberos/kadmind.log + +# passwordless ssh +RUN rm -f /etc/ssh/ssh_host_dsa_key /etc/ssh/ssh_host_rsa_key /root/.ssh/id_rsa +RUN ssh-keygen -q -N "" -t dsa -f /etc/ssh/ssh_host_dsa_key +RUN ssh-keygen -q -N "" -t rsa -f /etc/ssh/ssh_host_rsa_key +RUN ssh-keygen -q -N "" -t rsa -f /root/.ssh/id_rsa +RUN cp /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys + +# java +RUN mkdir -p /usr/java/default && \ + curl -Ls 'http://download.oracle.com/otn-pub/java/jdk/8u131-b11/d54c1d3a095b4ff2b6607d096fa80163/jdk-8u131-linux-x64.tar.gz' -H 'Cookie: oraclelicense=accept-securebackup-cookie' | \ + tar --strip-components=1 -xz -C /usr/java/default/ + +ENV JAVA_HOME /usr/java/default +ENV PATH $PATH:$JAVA_HOME/bin + +RUN curl -LOH 'Cookie: oraclelicense=accept-securebackup-cookie' 'http://download.oracle.com/otn-pub/java/jce/8/jce_policy-8.zip' +RUN unzip jce_policy-8.zip +RUN cp /UnlimitedJCEPolicyJDK8/local_policy.jar /UnlimitedJCEPolicyJDK8/US_export_policy.jar $JAVA_HOME/jre/lib/security + +ENV HADOOP_VERSION=2.8.4 --- End diff -- I agree, but for now we still have to ensure that the hadoop version in flink-dist matches, no? ---
[GitHub] flink pull request #6377: [FLINK-8981] Add end-to-end test for running on YA...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6377#discussion_r203997764 --- Diff: flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/bootstrap.sh --- @@ -0,0 +1,121 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +: ${HADOOP_PREFIX:=/usr/local/hadoop} + +$HADOOP_PREFIX/etc/hadoop/hadoop-env.sh + +rm /tmp/*.pid + +# installing libraries if any - (resource urls added comma separated to the ACP system variable) +cd $HADOOP_PREFIX/share/hadoop/common ; for cp in ${ACP//,/ }; do echo == $cp; curl -LO $cp ; done; cd - + +# kerberos client +sed -i "s/EXAMPLE.COM/${KRB_REALM}/g" /etc/krb5.conf --- End diff -- yeah nvm, I doubt introducing a placeholder really fixes things :/ ---
[GitHub] flink pull request #6377: [FLINK-8981] Add end-to-end test for running on YA...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6377#discussion_r203974298 --- Diff: flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/bootstrap.sh --- @@ -0,0 +1,121 @@ +#!/bin/bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +: ${HADOOP_PREFIX:=/usr/local/hadoop} + +$HADOOP_PREFIX/etc/hadoop/hadoop-env.sh + +rm /tmp/*.pid + +# installing libraries if any - (resource urls added comma separated to the ACP system variable) +cd $HADOOP_PREFIX/share/hadoop/common ; for cp in ${ACP//,/ }; do echo == $cp; curl -LO $cp ; done; cd - + +# kerberos client +sed -i "s/EXAMPLE.COM/${KRB_REALM}/g" /etc/krb5.conf --- End diff -- `EXAMPLE.COM` is used in several places, is there any way we can set this in a single place? (for example with search if necessary) ---
[GitHub] flink pull request #6377: [FLINK-8981] Add end-to-end test for running on YA...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6377#discussion_r203972431 --- Diff: flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/log4j.properties --- @@ -0,0 +1,354 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Define some default values that can be overridden by system properties +hadoop.root.logger=INFO,console +hadoop.log.dir=. +hadoop.log.file=hadoop.log + +# Define the root logger to the system property "hadoop.root.logger". +log4j.rootLogger=${hadoop.root.logger}, EventCounter + +# Logging Threshold +log4j.threshold=ALL + +# Null Appender +log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender + +# +# Rolling File Appender - cap space usage at 5gb. +# +hadoop.log.maxfilesize=256MB +hadoop.log.maxbackupindex=20 +log4j.appender.RFA=org.apache.log4j.RollingFileAppender +log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file} + +log4j.appender.RFA.MaxFileSize=${hadoop.log.maxfilesize} +log4j.appender.RFA.MaxBackupIndex=${hadoop.log.maxbackupindex} + +log4j.appender.RFA.layout=org.apache.log4j.PatternLayout + +# Pattern format: Date LogLevel LoggerName LogMessage +log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n +# Debugging Pattern format +#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + + +# +# Daily Rolling File Appender +# + +log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender +log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file} + +# Rollover at midnight +log4j.appender.DRFA.DatePattern=.-MM-dd + +log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout + +# Pattern format: Date LogLevel LoggerName LogMessage +log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n +# Debugging Pattern format +#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} (%F:%M(%L)) - %m%n + + +# +# console +# Add "console" to rootlogger above if you want to use this +# + +log4j.appender.console=org.apache.log4j.ConsoleAppender +log4j.appender.console.target=System.err +log4j.appender.console.layout=org.apache.log4j.PatternLayout +log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{2}: %m%n + +# +# TaskLog Appender +# + +#Default values +hadoop.tasklog.taskid=null +hadoop.tasklog.iscleanup=false +hadoop.tasklog.noKeepSplits=4 +hadoop.tasklog.totalLogFileSize=100 +hadoop.tasklog.purgeLogSplits=true +hadoop.tasklog.logsRetainHours=12 + +log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender +log4j.appender.TLA.taskId=${hadoop.tasklog.taskid} +log4j.appender.TLA.isCleanup=${hadoop.tasklog.iscleanup} +log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize} + +log4j.appender.TLA.layout=org.apache.log4j.PatternLayout +log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n + +# +# HDFS block state change log from block manager +# +# Uncomment the following to log normal block state change +# messages from BlockManager in NameNode. +#log4j.logger.BlockStateChange=DEBUG + +# +#Security appender +# +hadoop.security.logger=INFO,NullAppender +hadoop.security.log.maxfilesize=256MB +hadoop.security.log.maxbackupindex=20 +log4j.category.SecurityLogger=${hadoop.security.logger} +hadoop.security.log.file=SecurityAuth-${user.name}.audit +log4j.appender.RFAS=org.apache.log4j.RollingFileAppender +log4j.appender.RFAS.File=${hadoop.log.dir}/${hadoop.security.log.file} +log4j.appender.RFAS.layout=org.apache.log4j.PatternLayout +log4j.appender.RFAS.layout.C
[GitHub] flink pull request #6377: [FLINK-8981] Add end-to-end test for running on YA...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6377#discussion_r203972230 --- Diff: flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh --- @@ -0,0 +1,104 @@ +#!/usr/bin/env bash + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +set -o pipefail + +source "$(dirname "$0")"/common.sh + +FLINK_TARBALL_DIR=$TEST_DATA_DIR +FLINK_TARBALL=flink.tar.gz +FLINK_DIRNAME=$(basename $FLINK_DIR) + +echo "Flink Tarball directory $FLINK_TARBALL_DIR" +echo "Flink tarball filename $FLINK_TARBALL" +echo "Flink distribution directory name $FLINK_DIRNAME" +echo "End-to-end directory $END_TO_END_DIR" +docker --version +docker-compose --version + +mkdir -p $FLINK_TARBALL_DIR +tar czf $FLINK_TARBALL_DIR/$FLINK_TARBALL -C $(dirname $FLINK_DIR) . + +echo "Building Hadoop Docker container" +until docker build -f $END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/Dockerfile -t flink/docker-hadoop-secure-cluster:latest $END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/; do +# with all the downloading and ubuntu updating a lot of flakiness can happen, make sure +# we don't immediately fail +echo "Something went wrong while building the Docker image, retrying ..." +sleep 2 +done + +echo "Starting Hadoop cluster" +docker-compose -f $END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/docker-compose.yml up -d + +# make sure we stop our cluster at the end +function cluster_shutdown { + # don't call ourselves again for another signal interruption + trap "exit -1" INT + # don't call ourselves again for normal exit + trap "" EXIT + + docker-compose -f $END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/docker-compose.yml down + rm $FLINK_TARBALL_DIR/$FLINK_TARBALL +} +trap cluster_shutdown INT +trap cluster_shutdown EXIT + +until docker cp $FLINK_TARBALL_DIR/$FLINK_TARBALL master:/home/hadoop-user/; do +# we're retrying this one because we don't know yet if the container is ready +echo "Uploading Flink tarball to docker master failed, retrying ..." +sleep 5 +done + +# now, at least the container is ready +docker exec -it master bash -c "tar xzf /home/hadoop-user/$FLINK_TARBALL --directory /home/hadoop-user/" + +docker exec -it master bash -c "echo \"security.kerberos.login.keytab: /home/hadoop-user/hadoop-user.keytab\" >> /home/hadoop-user/$FLINK_DIRNAME/conf/flink-conf.yaml" +docker exec -it master bash -c "echo \"security.kerberos.login.principal: hadoop-user\" >> /home/hadoop-user/$FLINK_DIRNAME/conf/flink-conf.yaml" + +echo "Flink config:" +docker exec -it master bash -c "cat /home/hadoop-user/$FLINK_DIRNAME/conf/flink-conf.yaml" + +# make the output path random, just in case it already exists, for example if we +# had cached docker containers +OUTPUT_PATH=hdfs:///user/hadoop-user/wc-out-$RANDOM + +# it's important to run this with higher parallelism, otherwise we might risk that +# JM and TM are on the same YARN node and that we therefore don't test the keytab shipping +until docker exec -it master bash -c "export HADOOP_CLASSPATH=\`hadoop classpath\` && /home/hadoop-user/$FLINK_DIRNAME/bin/flink run -m yarn-cluster -yn 3 -ys 1 -ytm 1200 -yjm 800 -p 3 /home/hadoop-user/$FLINK_DIRNAME/examples/streaming/WordCount.jar --output $OUTPUT_PATH"; do +echo "Running the Flink job failed, might be that the cluster is not ready yet, retrying ..." --- End diff -- is there no way to check whether the cluster is ready? The logs contain several submission failures due to this :/ ---
[GitHub] flink pull request #6377: [FLINK-8981] Add end-to-end test for running on YA...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6377#discussion_r203969501 --- Diff: flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/README.md --- @@ -0,0 +1,118 @@ +# Apache Hadoop Docker image with Kerberos enabled + +This image is modified version of Knappek/docker-hadoop-secure + * Knappek/docker-hadoop-secure <https://github.com/Knappek/docker-hadoop-secure> + +With bits and pieces added from Lewuathe/docker-hadoop-cluster to extend it to start a proper kerberized Hadoop cluster: + * Lewuathe/docker-hadoop-cluster <https://github.com/Lewuathe/docker-hadoop-cluster> + +And a lot of added stuff for making this an actual, properly configured, kerberized cluster with proper user/permissions structure. + +Versions + + +* JDK8 +* Hadoop 2.8.3 + +Default Environment Variables +- + +| Name | Value | Description | +| | | | +| `KRB_REALM` | `EXAMPLE.COM` | The Kerberos Realm, more information [here](https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html#) | +| `DOMAIN_REALM` | `example.com` | The Kerberos Domain Realm, more information [here](https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html#) | +| `KERBEROS_ADMIN` | `admin/admin` | The KDC admin user | +| `KERBEROS_ADMIN_PASSWORD` | `admin` | The KDC admin password | + +You can simply define these variables in the `docker-compose.yml`. + +Run image +- + +Clone the [Github project](https://github.com/aljoscha/docker-hadoop-secure-cluster) and run --- End diff -- point to apache repo instead ---
[GitHub] flink pull request #6377: [FLINK-8981] Add end-to-end test for running on YA...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6377#discussion_r203973291 --- Diff: flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/Dockerfile --- @@ -0,0 +1,159 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# This image is modified version of Knappek/docker-hadoop-secure +# * Knappek/docker-hadoop-secure <https://github.com/Knappek/docker-hadoop-secure> +# +# With bits and pieces added from Lewuathe/docker-hadoop-cluster to extend it to start a proper kerberized Hadoop cluster: +# * Lewuathe/docker-hadoop-cluster <https://github.com/Lewuathe/docker-hadoop-cluster> +# +# Author: Aljoscha Krettek +# Date: 2018 May, 15 +# +# Creates multi-node, kerberized Hadoop cluster on Docker + +FROM sequenceiq/pam:ubuntu-14.04 +MAINTAINER aljoscha + +USER root + +RUN addgroup hadoop +RUN useradd -d /home/hdfs -ms /bin/bash -G hadoop -p hdfs hdfs +RUN useradd -d /home/yarn -ms /bin/bash -G hadoop -p yarn yarn +RUN useradd -d /home/mapred -ms /bin/bash -G hadoop -p mapred mapred + +RUN useradd -d /home/hadoop-user -ms /bin/bash -p hadoop-user hadoop-user + +# install dev tools +RUN apt-get update +RUN apt-get install -y curl tar sudo openssh-server openssh-client rsync unzip + +# Kerberos client +RUN apt-get install krb5-user -y +RUN mkdir -p /var/log/kerberos +RUN touch /var/log/kerberos/kadmind.log + +# passwordless ssh +RUN rm -f /etc/ssh/ssh_host_dsa_key /etc/ssh/ssh_host_rsa_key /root/.ssh/id_rsa +RUN ssh-keygen -q -N "" -t dsa -f /etc/ssh/ssh_host_dsa_key +RUN ssh-keygen -q -N "" -t rsa -f /etc/ssh/ssh_host_rsa_key +RUN ssh-keygen -q -N "" -t rsa -f /root/.ssh/id_rsa +RUN cp /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys + +# java +RUN mkdir -p /usr/java/default && \ + curl -Ls 'http://download.oracle.com/otn-pub/java/jdk/8u131-b11/d54c1d3a095b4ff2b6607d096fa80163/jdk-8u131-linux-x64.tar.gz' -H 'Cookie: oraclelicense=accept-securebackup-cookie' | \ + tar --strip-components=1 -xz -C /usr/java/default/ + +ENV JAVA_HOME /usr/java/default +ENV PATH $PATH:$JAVA_HOME/bin + +RUN curl -LOH 'Cookie: oraclelicense=accept-securebackup-cookie' 'http://download.oracle.com/otn-pub/java/jce/8/jce_policy-8.zip' +RUN unzip jce_policy-8.zip +RUN cp /UnlimitedJCEPolicyJDK8/local_policy.jar /UnlimitedJCEPolicyJDK8/US_export_policy.jar $JAVA_HOME/jre/lib/security + +ENV HADOOP_VERSION=2.8.4 --- End diff -- This potentially uses a different hadoop version than the one against flink-dist was built against. ---
[GitHub] flink pull request #6377: [FLINK-8981] Add end-to-end test for running on YA...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6377#discussion_r203973314 --- Diff: flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/Dockerfile --- @@ -0,0 +1,159 @@ + +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# +# This image is modified version of Knappek/docker-hadoop-secure +# * Knappek/docker-hadoop-secure <https://github.com/Knappek/docker-hadoop-secure> +# +# With bits and pieces added from Lewuathe/docker-hadoop-cluster to extend it to start a proper kerberized Hadoop cluster: +# * Lewuathe/docker-hadoop-cluster <https://github.com/Lewuathe/docker-hadoop-cluster> +# +# Author: Aljoscha Krettek +# Date: 2018 May, 15 +# +# Creates multi-node, kerberized Hadoop cluster on Docker + +FROM sequenceiq/pam:ubuntu-14.04 +MAINTAINER aljoscha + +USER root + +RUN addgroup hadoop +RUN useradd -d /home/hdfs -ms /bin/bash -G hadoop -p hdfs hdfs +RUN useradd -d /home/yarn -ms /bin/bash -G hadoop -p yarn yarn +RUN useradd -d /home/mapred -ms /bin/bash -G hadoop -p mapred mapred + +RUN useradd -d /home/hadoop-user -ms /bin/bash -p hadoop-user hadoop-user + +# install dev tools +RUN apt-get update +RUN apt-get install -y curl tar sudo openssh-server openssh-client rsync unzip + +# Kerberos client +RUN apt-get install krb5-user -y +RUN mkdir -p /var/log/kerberos +RUN touch /var/log/kerberos/kadmind.log + +# passwordless ssh +RUN rm -f /etc/ssh/ssh_host_dsa_key /etc/ssh/ssh_host_rsa_key /root/.ssh/id_rsa +RUN ssh-keygen -q -N "" -t dsa -f /etc/ssh/ssh_host_dsa_key +RUN ssh-keygen -q -N "" -t rsa -f /etc/ssh/ssh_host_rsa_key +RUN ssh-keygen -q -N "" -t rsa -f /root/.ssh/id_rsa +RUN cp /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys + +# java +RUN mkdir -p /usr/java/default && \ + curl -Ls 'http://download.oracle.com/otn-pub/java/jdk/8u131-b11/d54c1d3a095b4ff2b6607d096fa80163/jdk-8u131-linux-x64.tar.gz' -H 'Cookie: oraclelicense=accept-securebackup-cookie' | \ + tar --strip-components=1 -xz -C /usr/java/default/ + +ENV JAVA_HOME /usr/java/default +ENV PATH $PATH:$JAVA_HOME/bin + +RUN curl -LOH 'Cookie: oraclelicense=accept-securebackup-cookie' 'http://download.oracle.com/otn-pub/java/jce/8/jce_policy-8.zip' +RUN unzip jce_policy-8.zip +RUN cp /UnlimitedJCEPolicyJDK8/local_policy.jar /UnlimitedJCEPolicyJDK8/US_export_policy.jar $JAVA_HOME/jre/lib/security + +ENV HADOOP_VERSION=2.8.4 + +# ENV HADOOP_URL https://www.apache.org/dist/hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz --- End diff -- remove ---
[GitHub] flink issue #6370: [FLINK-9894] [runtime] Potential Data Race
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6370 If another PR introduces race conditions, then these race conditions should be resolved in that very PR. ---
[GitHub] flink pull request #6374: [FLINK-9895][tests] Ensure error logging for Netty...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6374 [FLINK-9895][tests] Ensure error logging for NettyLeakDetectionResource ## What is the purpose of the change This PR is a small addition to #6363 to ensure that ERROR logging is enabled for Nettys `ResourceLeakDetector`, as otherwise the the leak will not cause test failures. ## Verifying this change * disable error logging in `flink-runtime` for `ResourceLeakDetector`. (see `log4j-test.properties`) * disable auto-release in `FileUploadHandler` * run `FileUploadHandlerTest` You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 9895 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6374.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6374 commit 373d6ef65b50de86897a9da6d403982aae59a3d1 Author: zentol Date: 2018-07-19T11:47:53Z [FLINK-9895][tests] Ensure error logging for NettyLeakDetectionResource ---
[GitHub] flink pull request #6371: [FLINK-9871] Use Description class for ConfigOptio...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6371#discussion_r203692344 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java --- @@ -42,22 +45,27 @@ public static final ConfigOption WATCH_HEARTBEAT_INTERVAL = ConfigOptions .key("akka.watch.heartbeat.interval") .defaultValue(ASK_TIMEOUT.defaultValue()) - .withDescription("Heartbeat interval for Akkaâs DeathWatch mechanism to detect dead TaskManagers. If" + - " TaskManagers are wrongly marked dead because of lost or delayed heartbeat messages, then you should" + - " decrease this value or increase akka.watch.heartbeat.pause. A thorough description of Akkaâs DeathWatch" + - " can be found http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector\;>here."); + .withDescription(Description.builder() + .text("Heartbeat interval for Akkaâs DeathWatch mechanism to detect dead TaskManagers. If" + --- End diff -- we could think about adding another version of `withDescription` that works like `text`, so that you could write this as ``` withDescription( "Heartbeat interval for Akkaâs DeathWatch mechanism to detect dead TaskManagers. If" + .. " Akkaâs DeathWatch can be found %s", link("http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector;, "here")); ``` Just a thought. ---
[GitHub] flink pull request #:
Github user zentol commented on the pull request: https://github.com/apache/flink/commit/ec28f92ffd042308494d9661a38ab462738611aa#commitcomment-29761887 In flink-core/src/main/java/org/apache/flink/configuration/Configuration.java: In flink-core/src/main/java/org/apache/flink/configuration/Configuration.java on line 744: An option may be for the current and deprecated key(s) at the same time. ---
[GitHub] flink pull request #6363: [FLINK-9860][REST] fix buffer leak in FileUploadHa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6363#discussion_r203662639 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyLeakDetectionResource.java --- @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector; +import org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetectorFactory; + +import org.junit.rules.ExternalResource; + +import javax.annotation.concurrent.GuardedBy; + +import static org.junit.Assert.fail; + +/** + * JUnit resource to fail with an assertion when Netty detects a resource leak (only with + * ERROR logging enabled for + * org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector). --- End diff -- Would it be possible to check for this condition by doing something like this in `@Before`? ``` Assert.assertTrue(LoggerFactory.getLogger(org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector.class).isErrorEnabled()); ``` ``` ---
[GitHub] flink issue #6363: [FLINK-9860][REST] fix buffer leak in FileUploadHandler
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6363 merging. ---
[GitHub] flink pull request #6355: [FLINK-9878][network][ssl] add more low-level ssl ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6355#discussion_r203658272 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java --- @@ -61,6 +62,7 @@ private ChannelFuture bindFuture; + private SSLUtils.SSLServerConfiguration serverSSLConfig; --- End diff -- add `@Nullable` ---
[GitHub] flink pull request #6355: [FLINK-9878][network][ssl] add more low-level ssl ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6355#discussion_r203657904 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java --- @@ -52,6 +55,7 @@ private Bootstrap bootstrap; + private SSLUtils.SSLClientConfiguration clientSSLConfig; --- End diff -- add `@Nullable` ---
[GitHub] flink pull request #6355: [FLINK-9878][network][ssl] add more low-level ssl ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6355#discussion_r203652194 --- Diff: docs/ops/security-ssl.md --- @@ -33,6 +33,9 @@ SSL can be enabled for all network communication between Flink components. SSL k * **akka.ssl.enabled**: SSL flag for akka based control connection between the Flink client, jobmanager and taskmanager * **jobmanager.web.ssl.enabled**: Flag to enable https access to the jobmanager's web frontend +Please see the configuration page about the +[complete list of SSL configuration parameters]({{site.baseurl}}/ops/config.html#ssl-settings), in particular **security.ssl.session-cache-size**. --- End diff -- just a suggestion, you could also embed the entire table directly, see `Configuration.md` on how to do it. ---
[GitHub] flink pull request #6342: [FLINK-9748][release] Use dedicated directory for ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6342#discussion_r203629175 --- Diff: tools/releasing/create_binary_release.sh --- @@ -44,6 +44,12 @@ else SHASUM="sha512sum" fi +cd .. + +FLINK_DIR=`pwd` +RELEASE_DIR=${RELEASE_DIR}/tools/releasing/release --- End diff -- yes! ---
[GitHub] flink pull request #6355: [FLINK-9878][network][ssl] add more low-level ssl ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6355#discussion_r203437995 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java --- @@ -160,4 +160,41 @@ key("security.ssl.verify-hostname") .defaultValue(true) .withDescription("Flag to enable peerâs hostname verification during ssl handshake."); + + /** +* SSL session cache size. +*/ + public static final ConfigOption SSL_SESSION_CACHE_SIZE = + key("security.ssl.session-cache-size") + .defaultValue(-1) + .withDescription("The size of the cache used for storing SSL session objects. " + + "According to https://github.com/netty/netty/issues/832, you should always set " + + "this to an appropriate number to not run into a bug with stalling IO threads " + + "during garbage collection. (-1 = use system default)."); + + /** +* SSL session timeout. +*/ + public static final ConfigOption SSL_SESSION_TIMEOUT = + key("security.ssl.session-timeout") + .defaultValue(-1) + .withDescription("The timeout (in ms) for the cached SSL session objects. (-1 = use system default)"); + + /** +* SSL session timeout during handshakes. +*/ + public static final ConfigOption SSL_HANDSHAKE_TIMEOUT = + key("security.ssl.handshake-timeout") + .defaultValue(-1) + .withDescription("The timeout (in ms) during SSL handshake. (-1 = use system default)"); + + /** +* SSL session timeout after flushing the `close_notify` message. +*/ + public static final ConfigOption SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT = + key("security.ssl.close-notify-flush-timeout") + .defaultValue(-1) + .withDescription("The timeout (in ms) for flushing the `close_notify` that was triggered by closing a " + --- End diff -- it's not showing up as a code block since that only works for markdown; the description so far was plain-text. ---
[GitHub] flink issue #6363: [FLINK-9860][REST] fix buffer leak in FileUploadHandler
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6363 +1 ---
[GitHub] flink pull request #6363: [FLINK-9860][REST] fix buffer leak in FileUploadHa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6363#discussion_r203422176 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyLeakDetectionResource.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector; +import org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetectorFactory; + +import org.junit.rules.ExternalResource; + +import static org.junit.Assert.fail; + +/** + * JUnit resource to fail with an assertion when Netty detects a resource leak (only with + * ERROR logging enabled for + * org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector). + * + * This should be used in a class rule: + * {@code + * @literal @ClassRule + * public static final NettyLeakDetectionResource LEAK_DETECTION = new NettyLeakDetectionResource(); + * } + */ +public class NettyLeakDetectionResource extends ExternalResource { + private static ResourceLeakDetectorFactory previousLeakDetector; + private static ResourceLeakDetector.Level previousLeakDetectorLevel; + + @Override + protected void before() { + previousLeakDetector = ResourceLeakDetectorFactory.instance(); + previousLeakDetectorLevel = ResourceLeakDetector.getLevel(); + + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); + ResourceLeakDetectorFactory.setResourceLeakDetectorFactory(new FailingResourceLeakDetectorFactory()); --- End diff -- so this isn't something we necessarily have to deal with now, but if multiple tests that use this resource run in parallel we might not reset to the correct factory/level. Effectively what we need is some kinda ref-counting so that only the first resource modifies the level and factory, and only the last resource reset them. :/ ---
[GitHub] flink pull request #6363: [FLINK-9860][REST] fix buffer leak in FileUploadHa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6363#discussion_r203414529 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -220,4 +224,5 @@ public void testUploadCleanupOnFailure() throws IOException { } MULTIPART_UPLOAD_RESOURCE.assertUploadDirectoryIsEmpty(); } + --- End diff -- revert ---
[GitHub] flink pull request #6363: [FLINK-9860][REST] fix buffer leak in FileUploadHa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6363#discussion_r203414700 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyLeakDetectionResource.java --- @@ -0,0 +1,84 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.netty; + +import org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector; +import org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetectorFactory; + +import org.junit.rules.ExternalResource; + +import static org.junit.Assert.fail; + +/** + * JUnit resource to fail with an assertion when Netty detects a resource leak (only with + * ERROR logging enabled for + * org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector). + * + * This should be used in a class rule: + * {@code + * @literal @ClassRule + * public static final NettyLeakDetectionResource LEAK_DETECTION = new NettyLeakDetectionResource(); + * } + */ +public class NettyLeakDetectionResource extends ExternalResource { + private static ResourceLeakDetectorFactory previousLeakDetector; + private static ResourceLeakDetector.Level previousLeakDetectorLevel; + + @Override + protected void before() { + previousLeakDetector = ResourceLeakDetectorFactory.instance(); + previousLeakDetectorLevel = ResourceLeakDetector.getLevel(); + + ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID); + ResourceLeakDetectorFactory.setResourceLeakDetectorFactory(new FailingResourceLeakDetectorFactory()); + } + + @Override + protected void after() { + ResourceLeakDetectorFactory.setResourceLeakDetectorFactory(previousLeakDetector); + ResourceLeakDetector.setLevel(previousLeakDetectorLevel); + } + + static class FailingResourceLeakDetectorFactory extends ResourceLeakDetectorFactory { --- End diff -- these could be private? ---
[GitHub] flink issue #6352: [FLINK-9815][yarn][tests] Harden tests against slow job s...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6352 yay travis is green. ---
[GitHub] flink pull request #6363: [FLINK-9860][REST] fix buffer leak in FileUploadHa...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6363#discussion_r203373280 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java --- @@ -50,6 +55,24 @@ private static final ObjectMapper OBJECT_MAPPER = RestMapperUtils.getStrictObjectMapper(); + private static ResourceLeakDetectorFactory previousLeakDetector; + private static ResourceLeakDetector.Level previousLeakDetectorLevel; + + @BeforeClass + public static void setLeakDetector() { --- End diff -- we could move this into a utility `LeakDetectorResource` so we can re-use it for any test by simply adding it as a `@Rule`. ---
[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6330#discussion_r203361773 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java --- @@ -138,12 +154,22 @@ public JarRunHandler( }); } - private static SavepointRestoreSettings getSavepointRestoreSettings( - final @Nonnull HandlerRequest request) + private SavepointRestoreSettings getSavepointRestoreSettings( + final @Nonnull HandlerRequest request) throws RestHandlerException { - final boolean allowNonRestoredState = getQueryParameter(request, AllowNonRestoredStateQueryParameter.class, false); - final String savepointPath = getQueryParameter(request, SavepointPathQueryParameter.class); + final JarRunRequestBody requestBody = request.getRequestBody(); + + final boolean allowNonRestoredState = fromRequestBodyOrQueryParameter( + requestBody.getAllowNonRestoredState(), + () -> getQueryParameter(request, AllowNonRestoredStateQueryParameter.class), + false, + log); + final String savepointPath = fromRequestBodyOrQueryParameter( --- End diff -- How does this prevent the scenario i described? ---
[GitHub] flink issue #6296: [FLINK-9793] YARN:When submitting a flink job with yarn-c...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6296 merging. ---
[GitHub] flink pull request #6362: [FLINK-9888][release] Remove unsafe defaults from ...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6362 [FLINK-9888][release] Remove unsafe defaults from release scripts ## What is the purpose of the change This PR removes several unnecessary and unsafe `*_VERSION` defaults from the release scripts. The scripts should never be called without explicitly setting these values. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 9888 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6362.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6362 commit 93c8f557af888020dca8ae81ae117f4e460f475d Author: zentol Date: 2018-07-18T11:42:06Z [FLINK-9888][release] Remove unsafe defaults from release scripts ---
[GitHub] flink issue #6352: [FLINK-9815][yarn][tests] Harden tests against slow job s...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6352 The `YARNHighAvailabilityITCase` still has the same problem. ---
[GitHub] flink pull request #6355: [FLINK-9878][network][ssl] add more low-level ssl ...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6355#discussion_r203326103 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java --- @@ -160,4 +160,41 @@ key("security.ssl.verify-hostname") .defaultValue(true) .withDescription("Flag to enable peerâs hostname verification during ssl handshake."); + + /** +* SSL session cache size. +*/ + public static final ConfigOption SSL_SESSION_CACHE_SIZE = + key("security.ssl.session-cache-size") + .defaultValue(-1) + .withDescription("The size of the cache used for storing SSL session objects. " + + "According to https://github.com/netty/netty/issues/832, you should always set " + + "this to an appropriate number to not run into a bug with stalling IO threads " + + "during garbage collection. (-1 = use system default)."); + + /** +* SSL session timeout. +*/ + public static final ConfigOption SSL_SESSION_TIMEOUT = + key("security.ssl.session-timeout") + .defaultValue(-1) + .withDescription("The timeout (in ms) for the cached SSL session objects. (-1 = use system default)"); + + /** +* SSL session timeout during handshakes. +*/ + public static final ConfigOption SSL_HANDSHAKE_TIMEOUT = + key("security.ssl.handshake-timeout") + .defaultValue(-1) + .withDescription("The timeout (in ms) during SSL handshake. (-1 = use system default)"); + + /** +* SSL session timeout after flushing the `close_notify` message. +*/ + public static final ConfigOption SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT = + key("security.ssl.close-notify-flush-timeout") + .defaultValue(-1) + .withDescription("The timeout (in ms) for flushing the `close_notify` that was triggered by closing a " + --- End diff -- could you try removing the ` signs? let's see if that trips up the test. ---
[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6330#discussion_r203309580 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.runtime.rest.messages.RequestBody; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +/** + * {@link RequestBody} for running a jar. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class JarRunRequestBody implements RequestBody { + + private static final String FIELD_NAME_ENTRY_CLASS = "entryClass"; + private static final String FIELD_NAME_PROGRAM_ARGUMENTS = "programArgs"; + private static final String FIELD_NAME_PARALLELISM = "parallelism"; + private static final String FIELD_NAME_ALLOW_NON_RESTORED_STATE = "allowNonRestoredState"; + private static final String FIELD_NAME_SAVEPOINT_PATH = "savepointPath"; + + @JsonProperty(FIELD_NAME_ENTRY_CLASS) + @Nullable + private String entryClassName; + + @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) + @Nullable + private String programArguments; + + @JsonProperty(FIELD_NAME_PARALLELISM) + @Nullable + private Integer parallelism; + + @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE) + @Nullable + private Boolean allowNonRestoredState; + + @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) + @Nullable + private String savepointPath; + + public JarRunRequestBody() { + this(null, null, null, null, null); + } + + @JsonCreator + public JarRunRequestBody( + @JsonProperty(FIELD_NAME_ENTRY_CLASS) String entryClassName, + @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String programArguments, + @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism, + @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE) Boolean allowNonRestoredState, + @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath) { --- End diff -- For example, if only a partial body is sent some fields may be null. I couldn't quickly find a way to allow either all fields or non to be null. ---
[GitHub] flink pull request #6312: [FLINK-9792] Added custom Description class for Co...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6312#discussion_r203306483 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/description/Formatter.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration.description; + +/** + * Allows providing multiple formatters for the description. E.g. Html formatter, Markdown formatter etc. + */ +public abstract class Formatter { + + private StringBuilder state = new StringBuilder(); + + /** +* Formats the description into a String using format specific tags. +* +* @param description description to be formatted +* @return string representation of the description +*/ + public String format(Description description) { + for (BlockElement blockElement : description.getBlocks()) { + blockElement.format(this); + } + return finalizeFormatting(); + } + + public void format(LinkElement element) { + formatLink(state, element.getLink(), element.getText()); + } + + public void format(TextElement element) { + String[] inlineElements = element.getElements().stream().map(el -> { + Formatter formatter = newInstance(); + el.format(formatter); + return formatter.finalizeFormatting(); + } + ).toArray(String[]::new); + formatText(state, escapeFormatPlaceholder(element.getFormat()), inlineElements); + } + + public void format(LineBreakElement element) { + formatLineBreak(state); + } + + public void format(ListElement element) { + String[] inlineElements = element.getEntries().stream().map(el -> { + Formatter formatter = newInstance(); + el.format(formatter); + return formatter.finalizeFormatting(); + } + ).toArray(String[]::new); + formatList(state, inlineElements); + } + + private String finalizeFormatting() { + String result = state.toString(); + state.setLength(0); + return result.replaceAll("%%", "%"); --- End diff -- If a link description contains `%%` this will unintentionally modify that, correct? Granted this is quite an edge-case. ---
[GitHub] flink pull request #6312: [FLINK-9792] Added custom Description class for Co...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6312#discussion_r203305919 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/description/Formatter.java --- @@ -0,0 +1,95 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration.description; + +/** + * Allows providing multiple formatters for the description. E.g. Html formatter, Markdown formatter etc. + */ +public abstract class Formatter { + + private StringBuilder state = new StringBuilder(); --- End diff -- could be final ---
[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6330#discussion_r203303832 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java --- @@ -138,12 +154,22 @@ public JarRunHandler( }); } - private static SavepointRestoreSettings getSavepointRestoreSettings( - final @Nonnull HandlerRequest request) + private SavepointRestoreSettings getSavepointRestoreSettings( + final @Nonnull HandlerRequest request) throws RestHandlerException { - final boolean allowNonRestoredState = getQueryParameter(request, AllowNonRestoredStateQueryParameter.class, false); - final String savepointPath = getQueryParameter(request, SavepointPathQueryParameter.class); + final JarRunRequestBody requestBody = request.getRequestBody(); + + final boolean allowNonRestoredState = fromRequestBodyOrQueryParameter( + requestBody.getAllowNonRestoredState(), + () -> getQueryParameter(request, AllowNonRestoredStateQueryParameter.class), + false, + log); + final String savepointPath = fromRequestBodyOrQueryParameter( --- End diff -- This could result in unexpected NullPointerExceptions when retrieving a primitive, like in the following example: ``` fromRequestBodyOrQueryParameter( requestBody.getParallelism(), () -> getQueryParameter(request, ParallelismQueryParameter.class) log); ``` The explicit default argument prevents that from happening. ---
[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6330#discussion_r203302981 --- Diff: flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java --- @@ -0,0 +1,313 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.configuration.BlobServerOptions; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.dispatcher.DispatcherGateway; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; +import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.rest.handler.HandlerRequest; +import org.apache.flink.runtime.rest.handler.HandlerRequestException; +import org.apache.flink.runtime.rest.messages.MessageParameter; +import org.apache.flink.runtime.rest.messages.MessageQueryParameter; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.util.BlobServerResource; +import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway; +import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever; +import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram; +import org.apache.flink.util.function.SupplierWithException; +import org.apache.flink.util.function.ThrowingConsumer; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; + +/** + * Tests for the parameter handling of the {@link JarRunHandler}. + */ +public class JarRunHandlerParameterTest { + + @ClassRule + public static final TemporaryFolder TMP = new TemporaryFolder(); + + @ClassRule + public static final BlobServerResource BLOB_SERVER_RESOURCE = new BlobServerResource(); + + private static final AtomicReference lastSubmittedJobGraphReference = new AtomicReference<>(); + private static JarRunHandler handler; + private static Path jarWithManifest; + private static Path jarWithoutManifest; + private static TestingDispatcherGateway restfulGateway; + + @BeforeClass + public static void setup() throws Exception { + Path jarDir = TMP.newFolder().toPath(); + + // properties are set property by surefire plugin + final String parameterProgramJarName = System.getProperty("parameterJarName") + ".jar"; + final String parameterProgramWithoutManifestJarName = System.getProperty("parameterJarWithoutManifestName") + ".jar"; + final Path jarLocation = Paths.get(System.getProperty("targetDir")); + + jarWithManifest = Files.copy( + jarLocation.resolve(parameterProgramJarName), + jarDir.resolve("program-with-manifest.jar")); + jarWithoutManifest = Files.copy( + jarLocation.resolve(parameterProgramWithoutManifestJarName), + jarDir.resolve("program-without-manifest.jar")); + + Configuration config = new Configuration(); + config.setString(BlobServerOptions.STORAGE_DIRECTORY, + TMP.newFolder()
[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6330#discussion_r203302518 --- Diff: flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java --- @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.webmonitor.handlers; + +import org.apache.flink.runtime.rest.messages.RequestBody; + +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; + +import javax.annotation.Nullable; + +/** + * {@link RequestBody} for running a jar. + */ +@JsonInclude(JsonInclude.Include.NON_NULL) +public class JarRunRequestBody implements RequestBody { + + private static final String FIELD_NAME_ENTRY_CLASS = "entryClass"; + private static final String FIELD_NAME_PROGRAM_ARGUMENTS = "programArgs"; + private static final String FIELD_NAME_PARALLELISM = "parallelism"; + private static final String FIELD_NAME_ALLOW_NON_RESTORED_STATE = "allowNonRestoredState"; + private static final String FIELD_NAME_SAVEPOINT_PATH = "savepointPath"; + + @JsonProperty(FIELD_NAME_ENTRY_CLASS) + @Nullable + private String entryClassName; + + @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) + @Nullable + private String programArguments; + + @JsonProperty(FIELD_NAME_PARALLELISM) + @Nullable + private Integer parallelism; + + @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE) + @Nullable + private Boolean allowNonRestoredState; + + @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) + @Nullable + private String savepointPath; + + public JarRunRequestBody() { + this(null, null, null, null, null); + } + + @JsonCreator + public JarRunRequestBody( + @JsonProperty(FIELD_NAME_ENTRY_CLASS) String entryClassName, + @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String programArguments, + @JsonProperty(FIELD_NAME_PARALLELISM) Integer parallelism, + @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE) Boolean allowNonRestoredState, + @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String savepointPath) { --- End diff -- yes they should be nullable ---
[GitHub] flink issue #6352: [FLINK-9815][yarn][tests] Harden tests against slow job s...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6352 will look into them tomorrow ---
[GitHub] flink issue #6352: [FLINK-9815][yarn][tests] Harden tests against slow job s...
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6352 the test failures may highlight tests that weren't shutting down the last application properly; previously this would've succeeded since the check was done in `@Before`. ---
[GitHub] flink pull request #6312: [FLINK-9792] Added custom Description class for Co...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6312#discussion_r203074481 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration.description; + +/** + * Formatter that transforms {@link Description} into Html representation. + */ +public class HtmlFormatter extends Formatter { + + @Override + protected void formatLink(StringBuilder state, String link, String description) { + state.append(String.format("%s", link, description)); + } + + @Override + protected void formatLineBreak(StringBuilder state) { + state.append(""); + } + + @Override + protected void formatText(StringBuilder state, String format, String[] elements) { + String escapedFormat = escapeCharacters(format); + state.append(String.format(escapedFormat, elements)); + } + + @Override + protected void formatList(StringBuilder state, String[] entries) { + state.append(""); + for (String entry : entries) { + state.append(String.format("%s", entry)); + } + state.append(""); + } + + @Override + protected Formatter newInstance() { + return new HtmlFormatter(); + } + + private static final String TEMPORARY_PLACEHOLDER = "superRandomTemporaryPlaceholder"; + + private static String escapeCharacters(String value) { + return value + .replaceAll("%s", TEMPORARY_PLACEHOLDER) --- End diff -- ok that makes sense, do we maybe want to handle this in `Formatter´ already?. ---
[GitHub] flink pull request #6312: [FLINK-9792] Added custom Description class for Co...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6312#discussion_r203069199 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration.description; + +/** + * Formatter that transforms {@link Description} into Html representation. + */ +public class HtmlFormatter extends Formatter { + + @Override + protected void formatLink(StringBuilder state, String link, String description) { + state.append(String.format("%s", link, description)); + } + + @Override + protected void formatLineBreak(StringBuilder state) { + state.append(""); + } + + @Override + protected void formatText(StringBuilder state, String format, String[] elements) { + String escapedFormat = escapeCharacters(format); + state.append(String.format(escapedFormat, elements)); + } + + @Override + protected void formatList(StringBuilder state, String[] entries) { + state.append(""); + for (String entry : entries) { + state.append(String.format("%s", entry)); + } + state.append(""); + } + + @Override + protected Formatter newInstance() { + return new HtmlFormatter(); + } + + private static final String TEMPORARY_PLACEHOLDER = "superRandomTemporaryPlaceholder"; + + private static String escapeCharacters(String value) { + return value + .replaceAll("%s", TEMPORARY_PLACEHOLDER) --- End diff -- and what is the actual issue? ---
[GitHub] flink pull request #6312: [FLINK-9792] Added custom Description class for Co...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6312#discussion_r203069177 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration.description; + +/** + * Formatter that transforms {@link Description} into Html representation. + */ +public class HtmlFormatter extends Formatter { + + @Override + protected void formatLink(StringBuilder state, String link, String description) { + state.append(String.format("%s", link, description)); + } + + @Override + protected void formatLineBreak(StringBuilder state) { + state.append(""); + } + + @Override + protected void formatText(StringBuilder state, String format, String[] elements) { + String escapedFormat = escapeCharacters(format); + state.append(String.format(escapedFormat, elements)); + } + + @Override + protected void formatList(StringBuilder state, String[] entries) { + state.append(""); + for (String entry : entries) { + state.append(String.format("%s", entry)); + } + state.append(""); + } + + @Override + protected Formatter newInstance() { + return new HtmlFormatter(); + } + + private static final String TEMPORARY_PLACEHOLDER = "superRandomTemporaryPlaceholder"; --- End diff -- any my point is exactly to prevent bugs from masking each other. The placeholder was safe to use since we could be reasonably sure it will not be present in the input. Once 2 entities use the same placeholder, with access to the same input, this is no longer the case. ---
[GitHub] flink pull request #6352: [FLINK-8163][yarn][tests] Harden tests against slo...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6352 [FLINK-8163][yarn][tests] Harden tests against slow job shutdowns ## What is the purpose of the change This PR hardens the `YarnTestBase` against jobs that just don't want to shut down that quickly (i.e. within 500ms). The maximum waiting time has been increase to 10 seconds, during which we periodically check the state of all applications. Additionally, the failure condition from `@Before` was moved to the `@After` method. This change will allow us to better differentiate between simple timing issues and unsuccessful job shutdowns. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 8163 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6352.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6352 commit 0dd65378f0c9f477bb8f5712bbc0b1f31440f5f0 Author: zentol Date: 2018-07-17T11:29:16Z [FLINK-8163][yarn][tests] Harden tests against slow job shutdowns ---
[GitHub] flink issue #6347: [hotfix] consistency: vertexes -> vertices
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6347 yes ---
[GitHub] flink pull request #6312: [FLINK-9792] Added custom Description class for Co...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6312#discussion_r202970121 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration.description; + +/** + * Formatter that transforms {@link Description} into Html representation. + */ +public class HtmlFormatter extends Formatter { + + @Override + protected void formatLink(StringBuilder state, String link, String description) { + state.append(String.format("%s", link, description)); + } + + @Override + protected void formatLineBreak(StringBuilder state) { + state.append(""); + } + + @Override + protected void formatText(StringBuilder state, String format, String[] elements) { + String escapedFormat = escapeCharacters(format); + state.append(String.format(escapedFormat, elements)); + } + + @Override + protected void formatList(StringBuilder state, String[] entries) { + state.append(""); + for (String entry : entries) { + state.append(String.format("%s", entry)); + } + state.append(""); + } + + @Override + protected Formatter newInstance() { + return new HtmlFormatter(); + } + + private static final String TEMPORARY_PLACEHOLDER = "superRandomTemporaryPlaceholder"; + + private static String escapeCharacters(String value) { + return value + .replaceAll("%s", TEMPORARY_PLACEHOLDER) --- End diff -- can you give me an example for where this is problematic? Does this occur if the _final formatted_ description contains `%`? ---
[GitHub] flink pull request #6312: [FLINK-9792] Added custom Description class for Co...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6312#discussion_r202969620 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java --- @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.configuration.description; + +/** + * Formatter that transforms {@link Description} into Html representation. + */ +public class HtmlFormatter extends Formatter { + + @Override + protected void formatLink(StringBuilder state, String link, String description) { + state.append(String.format("%s", link, description)); + } + + @Override + protected void formatLineBreak(StringBuilder state) { + state.append(""); + } + + @Override + protected void formatText(StringBuilder state, String format, String[] elements) { + String escapedFormat = escapeCharacters(format); + state.append(String.format(escapedFormat, elements)); + } + + @Override + protected void formatList(StringBuilder state, String[] entries) { + state.append(""); + for (String entry : entries) { + state.append(String.format("%s", entry)); + } + state.append(""); + } + + @Override + protected Formatter newInstance() { + return new HtmlFormatter(); + } + + private static final String TEMPORARY_PLACEHOLDER = "superRandomTemporaryPlaceholder"; --- End diff -- this should be distinct from the placeholder in `Utils`. ---
[GitHub] flink pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6294#discussion_r202968456 --- Diff: flink-core/src/main/java/org/apache/flink/configuration/description/TextElement.java --- @@ -53,6 +55,16 @@ public static TextElement text(String text) { return new TextElement(text, Collections.emptyList()); } + /** +* Tries to format the text as code. +* +* @return text element with applied formatting +*/ + public TextElement formatAsCode() { --- End diff -- alternatively we could add an explicit `Code` `InlineElement`. ---
[GitHub] flink pull request #6350: [FLINK-9873][runtime] Log task state when aborting...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6350#discussion_r202962522 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -457,13 +457,20 @@ public CheckpointTriggerResult triggerCheckpoint( Execution[] executions = new Execution[tasksToTrigger.length]; for (int i = 0; i < tasksToTrigger.length; i++) { Execution ee = tasksToTrigger[i].getCurrentExecutionAttempt(); - if (ee != null && ee.getState() == ExecutionState.RUNNING) { - executions[i] = ee; - } else { + if (ee == null) { --- End diff -- according to the `ExecutionVertex` docs this branch shouldn't be necessary at all, but i kept it in to be safe. ---
[GitHub] flink pull request #6350: [FLINK-9873][runtime] Log task state when aborting...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6350 [FLINK-9873][runtime] Log task state when aborting checkpoint ## What is the purpose of the change This PR adjusts the logging message for when a checkpoint is declined due to tasks not being ready. We now explicitly log the current task state. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 9873 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6350.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6350 commit 4dfe622fc45a2233dbba58640d6aa67be4739f86 Author: zentol Date: 2018-07-17T07:34:45Z [FLINK-9873][runtime] Log task state when aborting checkpoint ---
[GitHub] flink pull request #6349: [FLINK-6997][tests] Properly cancel test job
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6349 [FLINK-6997][tests] Properly cancel test job ## What is the purpose of the change With this PR the jobs started in `SavepointITCase#testSavepointForJobWithIteration` are properly canceled. Previously they remained in a running state until the cluster was shut down, causing several exceptions to be logged. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 9872 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6349.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6349 commit 434d52a4efaa31da59b04ede010b6a7757ebbcbc Author: zentol Date: 2018-07-17T09:34:29Z [FLINK-6997][tests] Properly cancel test job ---
[GitHub] flink issue #6347: [hotfix] consistency: vertexes -> vertices
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6347 `vertices` is the correct plural, but this is another one of those cases where fixing it might cause more harm than good since it could cause merge conflicts, yet provides no functional benefit. Additionally this PR makes a lot of whitespace changes that should be reverted in any case. ---
[GitHub] flink pull request #6342: [FLINK-9748][release] Use dedicated directory for ...
GitHub user zentol opened a pull request: https://github.com/apache/flink/pull/6342 [FLINK-9748][release] Use dedicated directory for release artifacts ## What is the purpose of the change With this PR artifacts created during the release process are no longer placed in the root flink directory, but instead a dedicated directory under `/tools/releasing`. This makes it easier to reset the repository state in case of an error, as all you have to do is remove said directory. It also prevents accidentally committing release files. In case of success this directory will contain all release artifacts that should be uploaded. Additionally this PR introduces variables for commonly used directories (flink root directory, release directory, flink-clone directory) and reduces usages of relative paths. ## Brief change log * modifies source/binary release scripts to use dedicate directory for storing release artifacts * modified rat-plugin to exclude release directory * modified .gitignore to exclude release directory ## Verifying this change Manually verified. @aljoscha @tillrohrmann I'd appreciate your input. You can merge this pull request into a Git repository by running: $ git pull https://github.com/zentol/flink 9748 Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/6342.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #6342 commit 92bf5ce764dafa82fcdc2ad3c625d194979c76d9 Author: zentol Date: 2018-07-16T13:16:19Z [FLINK-9748][release] Use dedicated directory for release artifacts ---
[GitHub] flink issue #6327: [FLINK-9839][e2e] add end-to-end tests with SSL enabled
Github user zentol commented on the issue: https://github.com/apache/flink/pull/6327 merging. ---
[GitHub] flink pull request #6340: [FLINK-9842][rest] Pass actual configuration to Bl...
Github user zentol commented on a diff in the pull request: https://github.com/apache/flink/pull/6340#discussion_r202655858 --- Diff: flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java --- @@ -57,24 +60,39 @@ /** * Tests for the {@link JobSubmitHandler}. */ +@RunWith(Parameterized.class) public class JobSubmitHandlerTest extends TestLogger { + @Parameterized.Parameters(name = "SSL enabled: {0}") + public static Iterable data() { + return Arrays.asList(true, false); + } + @ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder(); - private static BlobServer blobServer; - @BeforeClass - public static void setup() throws IOException { - Configuration config = new Configuration(); + private final Configuration sslConfig; --- End diff -- true, I'll give it a more generic name while merging. ---