[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205106925
  
--- Diff: 
flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java
 ---
@@ -66,14 +61,21 @@ public static void main(String[] args) throws Exception 
{
ParameterTool params = ParameterTool.fromArgs(args);
int loadFactor = 
Integer.parseInt(params.getRequired("loadFactor"));
String outputPath = params.getRequired("outputPath");
+   boolean infinite = params.getBoolean("infinite", false);
 
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
int numKeys = loadFactor * 128 * 1024;
-   DataSet> x1Keys = env.createInput(new 
Generator(numKeys, 1)).setParallelism(4);
+   DataSet> x1Keys;
DataSet> x2Keys = env.createInput(new 
Generator(numKeys * 32, 2)).setParallelism(4);
DataSet> x8Keys = env.createInput(new 
Generator(numKeys, 8)).setParallelism(4);
 
+   if (infinite) {
+   x1Keys = 
env.createInput(Generator.infinite()).setParallelism(4).filter(t -> t.f1 >= 0);
--- End diff --

This now filters out all data.


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205108213
  
--- Diff: 
flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/Generator.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.tests;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * InputFormat that generates a deterministic DataSet of Tuple2(String, 
Integer)
+ * 
+ * String: key, can be repeated.
+ * Integer: uniformly distributed int between 0 and 127
+ * 
+ *
+ * If control path was provided, as long as this file is empty dummy 
elements with value equal to -1 will be emitted.
--- End diff --

outdated


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205108561
  
--- Diff: 
flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/Generator.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.tests;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * InputFormat that generates a deterministic DataSet of Tuple2(String, 
Integer)
+ * 
+ * String: key, can be repeated.
+ * Integer: uniformly distributed int between 0 and 127
+ * 
+ *
+ * If control path was provided, as long as this file is empty dummy 
elements with value equal to -1 will be emitted.
+ */
+public class Generator implements InputFormat, 
GenericInputSplit> {
+
+   // total number of records
+   private final long numRecords;
+   // total number of keys
+   private final long numKeys;
+
+   // records emitted per partition
+   private long recordsPerPartition;
+   // number of keys per partition
+   private long keysPerPartition;
+
+   // number of currently emitted records
+   private long recordCnt;
+
+   // id of current partition
+   private int partitionId;
+
+   private final boolean infinite;
+
+   public Generator(long numKeys, int recordsPerKey) {
+   this(numKeys, recordsPerKey, false);
+   }
+
+   private Generator(long numKeys, int recordsPerKey, boolean infinite) {
+   this.numKeys = numKeys;
+   this.numRecords = numKeys * recordsPerKey;
+   this.infinite = infinite;
+   }
+
+   public static Generator infinite() {
+   return new Generator(Long.MAX_VALUE, 1, true);
+   }
+
+   @Override
+   public void configure(Configuration parameters) { }
+
+   @Override
+   public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
+   return null;
+   }
+
+   @Override
+   public GenericInputSplit[] createInputSplits(int minNumSplits) {
+
+   GenericInputSplit[] splits = new 
GenericInputSplit[minNumSplits];
+   for (int i = 0; i < minNumSplits; i++) {
+   splits[i] = new GenericInputSplit(i, minNumSplits);
+   }
+   return splits;
+   }
+
+   @Override
+   public InputSplitAssigner getInputSplitAssigner(GenericInputSplit[] 
inputSplits) {
+   return new DefaultInputSplitAssigner(inputSplits);
+   }
+
+   @Override
+   public void open(GenericInputSplit split) throws IOException {
+   this.partitionId = split.getSplitNumber();
+   // total number of partitions
+   int numPartitions = split.getTotalNumberOfSplits();
+
+   // ensure even distribution of records and keys
+   Preconditions.checkArgument(
+   numRecords % numPartitions == 0,
+   "Records cannot be evenly distributed among 
partitions");
+   Preconditions.checkArgument(
+   numKeys % numPartitions == 0,
+   "Keys cannot be evenly distributed among partitions");
+
+   this.recordsPerPartition = numRecords / numPartitions;
+   this.keysPerPartition = numKeys / numPartitions;
+
+   this.recordCnt = 0;
+   }
+
+   @Override
+   public boolean reache

[GitHub] flink pull request #6407: [FLINK-9877][docs] Add documentation page for diff...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6407#discussion_r205097411
  
--- Diff: docs/dev/stream/operators/joining.md ---
@@ -0,0 +1,286 @@
+---
+title: "Joining"
+nav-id: streaming_joins
+nav-show_overview: true
+nav-parent_id: streaming
+nav-pos: 10
+---
+
+
+* toc
+{:toc}
+
+# Window Join
+A window join will join the elements of two streams that share a common 
key and lie in the same window. These windows can be defined by using a [window 
assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) 
and are evaluated on a union of both streams. This is especially important for 
session window joins, which will be demonstrated below.
+
+The joined elements are then passed to a user-defined `JoinFunction` or 
`FlatJoinFunction` where the user can perform transformations on the joined 
elements.
+
+The general usage always looks like the followning:
+
+```java
+stream.join(otherStream)
+.where()
+.equalTo()
+.window()
+.apply()
+```
+
+Some notes on semantics:
+- The creation of pairwise combinations of elements of the two streams 
behaves like an inner-join, meaning elements from one stream will not be 
emitted if they don't have a corresponding element from the other stream to be 
joined with.
+- Those elements that do get joined will have as their timestamp the 
largest timestamp that still lies in the respective window. For example a 
window with `[5, 10)` as its boundaries would result in the joined elements 
having nine as their timestamp.
+
+In the following section we are going to give an overview over how 
different kinds of windows can be used for a window join and what the results 
of those joins would look like using examplary scenarios.
+
+## Tumbling Window
+When performing a tumbling window join, all elements with a common key and 
a common tumbling window are joined as pairwise combinations and passed on to 
the user-defined function. Because this behaves like an inner join, elements of 
one stream that do not have elements from another stream in their tumbling 
window are not emitted!
+
+### Example
+
+
+In our example we are defining a tumbling window with the size of 2 
milliseconds, which results in windows of the form `[0,1], [2,3], ...`. The 
image shows the pairwise combinations of all elements in each window which will 
be passed on to the user-defined function. You can also see how in the tumbling 
window `[6,7]` nothing is emitted because no elements from the green stream 
exist to be joined with the orange elements ⑥ and ⑦.
+
+
+
+
+```java
--- End diff --

please use { % highlight java % } syntax


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205087147
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+if [ ${CLEARED} -eq 0 ]; then
+
+if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+kill ${JM_WATCHDOG_PID} 2> /dev/null
+wait ${JM_WATCHDOG_PID} 2> /dev/null
+fi
+
+CLEARED=1
+fi
+}
+
+function verify_logs() {
+local OUTPUT=$FLINK_DIR/log/*.out
+local JM_FAILURES=$1
+local EXIT_CODE=0
+
+# verify that we have no alerts
+if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the general purpose DataSet job."
+EXIT_CODE=1
+fi
+
+# checks that all apart from the first JM recover the failed jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' 'Recovered 
SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${JM_FAILURES} ]; then
+echo "FAILURE: A JM did not take over."
+EXIT_CODE=1
+fi
+
+if [[ $EXIT_CODE != 0 ]]; then
+echo "One or more tests FAILED."
+exit $EXIT_CODE
+fi
+}
+
+function jm_watchdog() {
+local EXPECTED_JMS=$1
+local IP_PORT=$2
+
+while true; do
+local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' 
| wc -l`;
+local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS))
+for (( c=0; c> ${TEST_DATA_DIR}/control/test.txt
--- End diff --

An infinite flag should be sufficient.


---


[GitHub] flink issue #6378: [FLINK-9236] [pom] upgrade the version of apache parent p...

2018-07-25 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6378
  
merging.


---


[GitHub] flink issue #6402: [FLINK-9914][docs] Update Docker docs

2018-07-25 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6402
  
merging.


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205081350
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+if [ ${CLEARED} -eq 0 ]; then
+
+if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+kill ${JM_WATCHDOG_PID} 2> /dev/null
+wait ${JM_WATCHDOG_PID} 2> /dev/null
+fi
+
+CLEARED=1
+fi
+}
+
+function verify_logs() {
+local OUTPUT=$FLINK_DIR/log/*.out
+local JM_FAILURES=$1
+local EXIT_CODE=0
+
+# verify that we have no alerts
+if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the general purpose DataSet job."
+EXIT_CODE=1
+fi
+
+# checks that all apart from the first JM recover the failed jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' 'Recovered 
SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${JM_FAILURES} ]; then
+echo "FAILURE: A JM did not take over."
+EXIT_CODE=1
+fi
+
+if [[ $EXIT_CODE != 0 ]]; then
+echo "One or more tests FAILED."
+exit $EXIT_CODE
+fi
+}
+
+function jm_watchdog() {
+local EXPECTED_JMS=$1
+local IP_PORT=$2
+
+while true; do
+local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' 
| wc -l`;
+local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS))
+for (( c=0; c> ${TEST_DATA_DIR}/control/test.txt
--- End diff --

But the loop executes `wait_job_running ${JOB_ID}`, so don't we know it 
_actually_ restarted?


---


[GitHub] flink issue #6409: [FLINK-9899][Kinesis Connecotr] Add comprehensive per-sha...

2018-07-25 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6409
  
New metrics should be documented in 
https://ci.apache.org/projects/flink/flink-docs-master/monitoring/metrics.html#kinesis-connectors.


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205078058
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+if [ ${CLEARED} -eq 0 ]; then
+
+if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+kill ${JM_WATCHDOG_PID} 2> /dev/null
+wait ${JM_WATCHDOG_PID} 2> /dev/null
+fi
+
+CLEARED=1
+fi
+}
+
+function verify_logs() {
+local OUTPUT=$FLINK_DIR/log/*.out
+local JM_FAILURES=$1
+local EXIT_CODE=0
+
+# verify that we have no alerts
+if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the general purpose DataSet job."
+EXIT_CODE=1
+fi
+
+# checks that all apart from the first JM recover the failed jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' 'Recovered 
SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${JM_FAILURES} ]; then
+echo "FAILURE: A JM did not take over."
+EXIT_CODE=1
+fi
+
+if [[ $EXIT_CODE != 0 ]]; then
+echo "One or more tests FAILED."
+exit $EXIT_CODE
+fi
+}
+
+function jm_watchdog() {
+local EXPECTED_JMS=$1
+local IP_PORT=$2
+
+while true; do
+local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' 
| wc -l`;
+local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS))
+for (( c=0; c> ${TEST_DATA_DIR}/control/test.txt
--- End diff --

We've already verified that the job is properly restarted in the 
jobmanager-kill loop, so this should only be about shutting down the job.


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205069073
  
--- Diff: 
flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java
 ---
@@ -66,14 +59,21 @@ public static void main(String[] args) throws Exception 
{
ParameterTool params = ParameterTool.fromArgs(args);
int loadFactor = 
Integer.parseInt(params.getRequired("loadFactor"));
String outputPath = params.getRequired("outputPath");
+   String source = params.get("source", null);
--- End diff --

source parameter is not documented in the javadocs


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205070681
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
--- End diff --

same as below


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205070467
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+if [ ${CLEARED} -eq 0 ]; then
+
+if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+kill ${JM_WATCHDOG_PID} 2> /dev/null
+wait ${JM_WATCHDOG_PID} 2> /dev/null
+fi
+
+CLEARED=1
+fi
+}
+
+function verify_logs() {
+local OUTPUT=$FLINK_DIR/log/*.out
+local JM_FAILURES=$1
+local EXIT_CODE=0
+
+# verify that we have no alerts
+if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the general purpose DataSet job."
+EXIT_CODE=1
+fi
+
+# checks that all apart from the first JM recover the failed jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' 'Recovered 
SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${JM_FAILURES} ]; then
+echo "FAILURE: A JM did not take over."
+EXIT_CODE=1
+fi
+
+if [[ $EXIT_CODE != 0 ]]; then
+echo "One or more tests FAILED."
+exit $EXIT_CODE
+fi
+}
+
+function jm_watchdog() {
+local EXPECTED_JMS=$1
+local IP_PORT=$2
+
+while true; do
+local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' 
| wc -l`;
+local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS))
+for (( c=0; c

[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205070430
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+if [ ${CLEARED} -eq 0 ]; then
+
+if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+kill ${JM_WATCHDOG_PID} 2> /dev/null
+wait ${JM_WATCHDOG_PID} 2> /dev/null
+fi
+
+CLEARED=1
+fi
+}
+
+function verify_logs() {
+local OUTPUT=$FLINK_DIR/log/*.out
+local JM_FAILURES=$1
+local EXIT_CODE=0
+
+# verify that we have no alerts
+if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the general purpose DataSet job."
+EXIT_CODE=1
+fi
+
+# checks that all apart from the first JM recover the failed jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' 'Recovered 
SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${JM_FAILURES} ]; then
+echo "FAILURE: A JM did not take over."
+EXIT_CODE=1
+fi
+
+if [[ $EXIT_CODE != 0 ]]; then
+echo "One or more tests FAILED."
+exit $EXIT_CODE
+fi
+}
+
+function jm_watchdog() {
--- End diff --

move into shared `common-ha.sh`


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205072007
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+if [ ${CLEARED} -eq 0 ]; then
+
+if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+kill ${JM_WATCHDOG_PID} 2> /dev/null
+wait ${JM_WATCHDOG_PID} 2> /dev/null
+fi
+
+CLEARED=1
+fi
+}
+
+function verify_logs() {
+local OUTPUT=$FLINK_DIR/log/*.out
+local JM_FAILURES=$1
+local EXIT_CODE=0
+
+# verify that we have no alerts
+if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the general purpose DataSet job."
+EXIT_CODE=1
+fi
+
+# checks that all apart from the first JM recover the failed jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' 'Recovered 
SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${JM_FAILURES} ]; then
+echo "FAILURE: A JM did not take over."
+EXIT_CODE=1
+fi
+
+if [[ $EXIT_CODE != 0 ]]; then
+echo "One or more tests FAILED."
+exit $EXIT_CODE
+fi
+}
+
+function jm_watchdog() {
+local EXPECTED_JMS=$1
+local IP_PORT=$2
+
+while true; do
+local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' 
| wc -l`;
+local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS))
+for (( c=0; c> ${TEST_DATA_DIR}/control/test.txt
--- End diff --

Couldn't we simply cancel the job?


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205070024
  
--- Diff: 
flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/Generator.java
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.tests;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * InputFormat that generates a deterministic DataSet of Tuple2(String, 
Integer)
+ * 
+ * String: key, can be repeated.
+ * Integer: uniformly distributed int between 0 and 127
+ * 
+ *
+ * If control path was provided, as long as this file is empty dummy 
elements with value equal to -1 will be emitted.
+ */
+public class Generator implements InputFormat, 
GenericInputSplit> {
+
+   // total number of records
+   private final long numRecords;
+   // total number of keys
+   private final long numKeys;
+
+   // records emitted per partition
+   private long recordsPerPartition;
+   // number of keys per partition
+   private long keysPerPartition;
+
+   // number of currently emitted records
+   private long recordCnt;
+
+   // id of current partition
+   private int partitionId;
+
+   private final String path;
+
+   private boolean hasStarted = true;
+
+   public Generator(long numKeys, int recordsPerKey) {
+   this(numKeys, recordsPerKey, null);
+   }
+
+   public Generator(long numKeys, int recordsPerKey, String controlPath) {
+   this.numKeys = numKeys;
+   this.numRecords = numKeys * recordsPerKey;
+   this.path = controlPath;
+   }
+
+   @Override
+   public void configure(Configuration parameters) { }
+
+   @Override
+   public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
+   return null;
+   }
+
+   @Override
+   public GenericInputSplit[] createInputSplits(int minNumSplits) {
+
+   GenericInputSplit[] splits = new 
GenericInputSplit[minNumSplits];
+   for (int i = 0; i < minNumSplits; i++) {
+   splits[i] = new GenericInputSplit(i, minNumSplits);
+   }
+   return splits;
+   }
+
+   @Override
+   public InputSplitAssigner getInputSplitAssigner(GenericInputSplit[] 
inputSplits) {
+   return new DefaultInputSplitAssigner(inputSplits);
+   }
+
+   @Override
+   public void open(GenericInputSplit split) throws IOException {
+   this.partitionId = split.getSplitNumber();
+   // total number of partitions
+   int numPartitions = split.getTotalNumberOfSplits();
+
+   // ensure even distribution of records and keys
+   Preconditions.checkArgument(
+   numRecords % numPartitions == 0,
+   "Records cannot be evenly distributed among 
partitions");
+   Preconditions.checkArgument(
+   numKeys % numPartitions == 0,
+   "Keys cannot be evenly distributed among partitions");
+
+   this.recordsPerPartition = numRecords / numPartitions;
+   this.keysPerPartition = numKeys / numPartitions;
+
+   this.recordCnt = 0;
+
+   if (this.path != null) {
 

[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205071341
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+if [ ${CLEARED} -eq 0 ]; then
+
+if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+kill ${JM_WATCHDOG_PID} 2> /dev/null
+wait ${JM_WATCHDOG_PID} 2> /dev/null
+fi
+
+CLEARED=1
+fi
+}
+
+function verify_logs() {
+local OUTPUT=$FLINK_DIR/log/*.out
+local JM_FAILURES=$1
+local EXIT_CODE=0
+
+# verify that we have no alerts
+if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the general purpose DataSet job."
+EXIT_CODE=1
+fi
+
+# checks that all apart from the first JM recover the failed jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' 'Recovered 
SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${JM_FAILURES} ]; then
+echo "FAILURE: A JM did not take over."
+EXIT_CODE=1
+fi
+
+if [[ $EXIT_CODE != 0 ]]; then
+echo "One or more tests FAILED."
+exit $EXIT_CODE
+fi
+}
+
+function jm_watchdog() {
+local EXPECTED_JMS=$1
+local IP_PORT=$2
+
+while true; do
+local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' 
| wc -l`;
+local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS))
+for (( c=0; c

[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205070634
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+if [ ${CLEARED} -eq 0 ]; then
+
+if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+kill ${JM_WATCHDOG_PID} 2> /dev/null
+wait ${JM_WATCHDOG_PID} 2> /dev/null
+fi
+
+CLEARED=1
+fi
+}
+
+function verify_logs() {
--- End diff --

move into shared `common-ha.sh`, parameterized the strings to search for so 
it's also applicable for the streaming test.


---


[GitHub] flink pull request #6416: [FLINK-9942][rest] Guard handlers against null fie...

2018-07-25 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6416

[FLINK-9942][rest] Guard handlers against null fields

## What is the purpose of the change

This PR fixes prevents some NPEs that could arise if fields in the request 
are set to null or are omitted.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9942

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6416.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6416


commit 3e9046864d566ef6aa0d9af7a09771ebb5fe556b
Author: zentol 
Date:   2018-07-25T06:39:29Z

[FLINK-9942][rest] Guard handlers against null fields




---


[GitHub] flink pull request #6406: [FLINK-9159][runtime] Sanity check default timeout...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6406#discussion_r204820593
  
--- Diff: flink-docs/README.md ---
@@ -28,7 +28,7 @@ The `RestAPIDocGenerator` can be used to generate a full 
reference of the REST A
 To integrate a new endpoint into the generator
 1. Add a new `DocumentingRestEndpoint` class to `RestAPIDocGenerator` that 
extends the new endpoint class
 2. Add another call to `createHtmlFile` in `RestAPIDocGenerator#main`
-3. Regenerate the documentation by running `mvn package 
-Dgenerate-rest-docs -pl flink-docs -am -nsu`
+3. Regenerate the documentation by running `mvn package 
-Dgenerate-rest-docs -pl flink-docs -am -nsu -DskipTests`
--- End diff --

ah wait a second, this would run the test for all modules. urgh...


---


[GitHub] flink issue #6396: [FLINK-9806][docs] Add canonical link element to docs

2018-07-24 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6396
  
we can keep the hotfix in this PR but it shouldn't be squashed into the 
main commit as it is unrelated.


---


[GitHub] flink pull request #6406: [FLINK-9159][runtime] Sanity check default timeout...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6406#discussion_r204819304
  
--- Diff: flink-docs/README.md ---
@@ -28,7 +28,7 @@ The `RestAPIDocGenerator` can be used to generate a full 
reference of the REST A
 To integrate a new endpoint into the generator
 1. Add a new `DocumentingRestEndpoint` class to `RestAPIDocGenerator` that 
extends the new endpoint class
 2. Add another call to `createHtmlFile` in `RestAPIDocGenerator#main`
-3. Regenerate the documentation by running `mvn package 
-Dgenerate-rest-docs -pl flink-docs -am -nsu`
+3. Regenerate the documentation by running `mvn package 
-Dgenerate-rest-docs -pl flink-docs -am -nsu -DskipTests`
--- End diff --

I intentionally left this option out. This guarantees the correctness of 
the output since it both tests the generator before the generation, and the 
completeness afterwards. There's a lot of additional options one _could_ add, 
but I'd prefer if we stick to the _necessary_ ones.


---


[GitHub] flink pull request #6400: [FLINK-9931][ui] watermark display bug.

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6400#discussion_r204758822
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
 ---
@@ -28,15 +35,21 @@
  * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
  *
  * If the query parameters do contain a "get" parameter, a 
comma-separated list of metric names is expected as a value.
- * {@code /metrics?get=X,Y}
+ * {@code /metrics?get=X,Y OR /metrics?get=X,Y&=0-4,7-10}
  * The handler will then return a list containing the values of the 
requested metrics.
  * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] 
}
  *
  * @deprecated This class is subsumed by {@link SubtaskMetricsHandler} and 
is only kept for backwards-compatibility.
  */
 @Deprecated
 public class JobVertexMetricsHandler extends AbstractMetricsHandler {
+   private final Logger log = LoggerFactory.getLogger(getClass());
--- End diff --

either add this as a protected field to the `AbstractMetricsHandler`, or 
change it to being static.


---


[GitHub] flink pull request #6400: [FLINK-9931][ui] watermark display bug.

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6400#discussion_r204757730
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
 ---
@@ -57,4 +70,77 @@ public JobVertexMetricsHandler(Executor executor, 
MetricFetcher fetcher) {
? task.metrics
: null;
}
+
+   @Override
+   protected String getRequestMetricsList(Map queryParams) 
{
+   String metricRequestsList = queryParams.get(PARAMETER_METRICS);
+   String subtasksList = queryParams.get(SUB_TASKS);
+   if (subtasksList == null || subtasksList.isEmpty()) {
+   return queryParams.get(PARAMETER_METRICS);
--- End diff --

return `super.getRequestMetricsList(queryParams)` instead


---


[GitHub] flink pull request #6400: [FLINK-9931][ui] watermark display bug.

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6400#discussion_r204759313
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
 ---
@@ -28,15 +35,21 @@
  * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
  *
  * If the query parameters do contain a "get" parameter, a 
comma-separated list of metric names is expected as a value.
- * {@code /metrics?get=X,Y}
+ * {@code /metrics?get=X,Y OR /metrics?get=X,Y&=0-4,7-10}
--- End diff --

should only be a single `&`


---


[GitHub] flink pull request #6388: [FLINK-6222] Allow passing env variables to start ...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6388#discussion_r204750473
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -96,6 +96,8 @@ DEFAULT_ENV_JAVA_OPTS=""# 
Optional JVM args
 DEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args 
(JobManager)
 DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args 
(TaskManager)
 DEFAULT_ENV_SSH_OPTS="" # Optional SSH 
parameters running in cluster mode
+DEFAULT_YARN_CONF_DIR=""# YARN Configuration Directory, if 
necessary
--- End diff --

indentation?


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204720999
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
 ---
@@ -297,7 +300,13 @@ public void testFailingDataSinkTask() {
Configuration stubParams = new Configuration();
super.getTaskConfig().setStubParameters(stubParams);
 
-   super.registerFileOutputTask(MockFailingOutputFormat.class, new 
File(tempTestPath).toURI().toString());
+   File tempTestFile = null;
+   try {
+   tempTestFile = tempFolder.newFile(tempTestFileName);
--- End diff --

same issue as in `JarFileCreatorTest`


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204721547
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
 ---
@@ -45,33 +46,30 @@
 
 public class DataSourceTaskTest extends TaskTestBase {
 
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
private static final int MEMORY_MANAGER_SIZE = 1024 * 1024;
 
private static final int NETWORK_BUFFER_SIZE = 1024;
 
private List outList;
-   
-   private String tempTestPath = 
DataSinkTaskTest.constructTestPath(DataSourceTaskTest.class, "dst_test");
-   
-   @After
-   public void cleanUp() {
-   File tempTestFile = new File(this.tempTestPath);
-   if(tempTestFile.exists()) {
-   tempTestFile.delete();
-   }
-   }
+
+   private final String tempTestFileName = getClass().getName() + 
"-dst_test";

@Test
public void testDataSourceTask() {
int keyCnt = 100;
int valCnt = 20;

this.outList = new ArrayList();
-   
+   File tempTestFile = null;
try {
-   InputFilePreparator.prepareInputFile(new 
UniformRecordGenerator(keyCnt, valCnt, false), 
-   this.tempTestPath, true);
+   tempTestFile = tempFolder.newFile(tempTestFileName);
+   InputFilePreparator.prepareInputFile(new 
UniformRecordGenerator(keyCnt, valCnt, false),
+   tempTestFile, true);
} catch (IOException e1) {
+   System.err.println(e1);
--- End diff --

let's change the method signature to throw `IOException` and remove the 
try/catch block.


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204721015
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
 ---
@@ -337,7 +345,13 @@ public void testFailingSortingDataSinkTask() {
super.getTaskConfig().setFilehandlesInput(0, 8);
super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
 
-   super.registerFileOutputTask(MockFailingOutputFormat.class, new 
File(tempTestPath).toURI().toString());
+   File tempTestFile = null;
+   try {
+   tempTestFile = tempFolder.newFile(tempTestFileName);
--- End diff --

same issue as in `JarFileCreatorTest`


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204721035
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
 ---
@@ -363,7 +376,9 @@ public void testCancelDataSinkTask() throws Exception {
Configuration stubParams = new Configuration();
super.getTaskConfig().setStubParameters(stubParams);
 
-   super.registerFileOutputTask(MockOutputFormat.class,  new 
File(tempTestPath).toURI().toString());
+   File tempTestFile = tempFolder.newFile(tempTestFileName);
--- End diff --

same issue as in `JarFileCreatorTest`


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204720914
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
 ---
@@ -79,12 +75,11 @@ public void testDataSinkTask() {
 
DataSinkTask testTask = new 
DataSinkTask<>(this.mockEnv);
 
-   super.registerFileOutputTask(MockOutputFormat.class, 
new File(tempTestPath).toURI().toString());
+   File tempTestFile = 
tempFolder.newFile(tempTestFileName);
--- End diff --

same issue as in `JarFileCreatorTest`


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204721046
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
 ---
@@ -419,7 +432,13 @@ public void testCancelSortingDataSinkTask() {
super.getTaskConfig().setFilehandlesInput(0, 8);
super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
 
-   super.registerFileOutputTask(MockOutputFormat.class,  new 
File(tempTestPath).toURI().toString());
+   File tempTestFile = null;
+   try {
+   tempTestFile = tempFolder.newFile(tempTestFileName);
--- End diff --

same issue as in `JarFileCreatorTest`


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204721390
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
 ---
@@ -45,33 +46,30 @@
 
 public class DataSourceTaskTest extends TaskTestBase {
 
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
private static final int MEMORY_MANAGER_SIZE = 1024 * 1024;
 
private static final int NETWORK_BUFFER_SIZE = 1024;
 
private List outList;
-   
-   private String tempTestPath = 
DataSinkTaskTest.constructTestPath(DataSourceTaskTest.class, "dst_test");
-   
-   @After
-   public void cleanUp() {
-   File tempTestFile = new File(this.tempTestPath);
-   if(tempTestFile.exists()) {
-   tempTestFile.delete();
-   }
-   }
+
+   private final String tempTestFileName = getClass().getName() + 
"-dst_test";

@Test
public void testDataSourceTask() {
int keyCnt = 100;
int valCnt = 20;

this.outList = new ArrayList();
-   
+   File tempTestFile = null;
try {
-   InputFilePreparator.prepareInputFile(new 
UniformRecordGenerator(keyCnt, valCnt, false), 
-   this.tempTestPath, true);
+   tempTestFile = tempFolder.newFile(tempTestFileName);
--- End diff --

same issue as in `JarFileCreatorTest`


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204720983
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
 ---
@@ -225,7 +224,13 @@ public void testSortingDataSinkTask() {
super.getTaskConfig().setFilehandlesInput(0, 8);
super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
 
-   super.registerFileOutputTask(MockOutputFormat.class, new 
File(tempTestPath).toURI().toString());
+   File tempTestFile = null;
+   try {
+   tempTestFile = tempFolder.newFile(tempTestFileName);
--- End diff --

same issue as in `JarFileCreatorTest`


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204718919
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 ---
@@ -481,10 +479,6 @@ public void 
testCallsForwardedToNonPartitionedBackend() throws Exception {
env.getTaskKvStateRegistry());
}
 
-   static Environment getMockEnvironment() {
-   return getMockEnvironment(new File[] { new 
File(System.getProperty("java.io.tmpdir")) });
-   }
-
static Environment getMockEnvironment(File[] tempDirs) {
--- End diff --

we could change this to a vararg method; then we could simplify calls like 
`getMockEnvironment(new File[] { tempFolder.newFolder() });`.


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204722139
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
 ---
@@ -240,36 +205,19 @@ public void testOnlyLevel2NestedDirectories() {
 */
@Test
public void testTwoNestedDirectoriesWithFilteredFilesTrue() {
-
-   String sep = System.getProperty("file.separator");
-
try {
String firstLevelDir = TestFileUtils.randomFileName();
String secondLevelDir = TestFileUtils.randomFileName();
String thirdLevelDir = TestFileUtils.randomFileName();
String secondLevelFilterDir = 
"_"+TestFileUtils.randomFileName();
String thirdLevelFilterDir = 
"_"+TestFileUtils.randomFileName();
 
-   File nestedDir = new File(tempPath + sep + 
firstLevelDir);
-   nestedDir.mkdirs();
-   nestedDir.deleteOnExit();
-
-   File insideNestedDir = new File(tempPath + sep + 
firstLevelDir + sep + secondLevelDir);
-   insideNestedDir.mkdirs();
-   insideNestedDir.deleteOnExit();
-   File insideNestedDirFiltered = new File(tempPath + sep 
+ firstLevelDir + sep + secondLevelFilterDir);
-   insideNestedDirFiltered.mkdirs();
-   insideNestedDirFiltered.deleteOnExit();
-   File filteredFile = new File(tempPath + sep + 
firstLevelDir + sep + "_IWillBeFiltered");
-   filteredFile.createNewFile();
-   filteredFile.deleteOnExit();
-
-   File nestedNestedDir = new File(tempPath + sep + 
firstLevelDir + sep + secondLevelDir + sep + thirdLevelDir);
-   nestedNestedDir.mkdirs();
-   nestedNestedDir.deleteOnExit();
-   File nestedNestedDirFiltered = new File(tempPath + sep 
+ firstLevelDir + sep + secondLevelDir + sep + thirdLevelFilterDir);
-   nestedNestedDirFiltered.mkdirs();
-   nestedNestedDirFiltered.deleteOnExit();
+   File nestedNestedDirFiltered = 
tempFolder.newFolder(firstLevelDir, secondLevelDir, thirdLevelDir, 
thirdLevelFilterDir);
+   File nestedNestedDir = 
nestedNestedDirFiltered.getParentFile();
+   File insideNestedDir = nestedNestedDir.getParentFile();
+   File nestedDir = insideNestedDir.getParentFile();
+   File insideNestedDirFiltered = 
tempFolder.newFolder(firstLevelDir, secondLevelFilterDir);
+   new File(nestedDir, "_IWillBeFiltered");
--- End diff --

This file isn't explicitly created anymore.


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204719730
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 ---
@@ -127,7 +132,7 @@ public void testSlotAllocation() throws Exception {
TestingUtils.infiniteTime());
 
final File[] taskExecutorLocalStateRootDirs =
-   new File[]{new 
File(System.getProperty("java.io.tmpdir"), "localRecovery")};
+   new File[]{ tempFolder.newFolder("localRecovery") };
--- End diff --

same issue as in `JarFileCreatorTest`


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204720974
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
 ---
@@ -141,7 +136,13 @@ public void testUnionDataSinkTask() {
 
DataSinkTask testTask = new 
DataSinkTask<>(this.mockEnv);
 
-   super.registerFileOutputTask(MockOutputFormat.class, new 
File(tempTestPath).toURI().toString());
+   File tempTestFile = null;
+   try {
+   tempTestFile = tempFolder.newFile(tempTestFileName);
--- End diff --

same issue as in `JarFileCreatorTest`


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204719565
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
 ---
@@ -218,7 +224,7 @@ public void TestAnonymousClass() throws IOException {
 
@Test
public void TestExtendIdentifier() throws IOException {
-   File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
+   File out = tempFolder.newFile("jarcreatortest.jar");
--- End diff --

to preserve the existing behavior of passing a file that does not exist, do 
this instead:
```
File out = new File(tempFolder.getRoot(), "jarcreatortest.jar");
```

Also applies to the other tests in this file.


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204720758
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
 ---
@@ -49,22 +50,17 @@
 import static org.junit.Assert.assertTrue;
 
 public class DataSinkTaskTest extends TaskTestBase {
+
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();

private static final Logger LOG = 
LoggerFactory.getLogger(DataSinkTaskTest.class);
 
private static final int MEMORY_MANAGER_SIZE = 3 * 1024 * 1024;
 
private static final int NETWORK_BUFFER_SIZE = 1024;
 
-   private final String tempTestPath = 
constructTestPath(DataSinkTaskTest.class, "dst_test");
-
-   @After
-   public void cleanUp() {
-   File tempTestFile = new File(this.tempTestPath);
-   if(tempTestFile.exists()) {
-   tempTestFile.delete();
-   }
-   }
+   private final String tempTestFileName = getClass().getName() + 
"-dst_test";
--- End diff --

since this was only used to prevent name clashes between tests it should 
now be redundant and can be removed.


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204720065
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
 ---
@@ -338,22 +279,11 @@ public void testGetStatisticsMultipleNestedFiles() {
String secondLevelDir = TestFileUtils.randomFileName();
String secondLevelDir2 = TestFileUtils.randomFileName();
 
-   File nestedDir = new File(tempPath + 
System.getProperty("file.separator") 
-   + firstLevelDir);
-   nestedDir.mkdirs();
-   nestedDir.deleteOnExit();
+   File insideNestedDir = 
tempFolder.newFolder(firstLevelDir, secondLevelDir);
+   File insideNestedDir2 = 
tempFolder.newFolder(firstLevelDir, secondLevelDir2);
+   File nestedDir = insideNestedDir.getParentFile();
 
-   File insideNestedDir = new File(tempPath + 
System.getProperty("file.separator") 
-   + firstLevelDir + 
System.getProperty("file.separator") + secondLevelDir);
-   insideNestedDir.mkdirs();
-   insideNestedDir.deleteOnExit();
-
-   File insideNestedDir2 = new File(tempPath + 
System.getProperty("file.separator")
-   + firstLevelDir + 
System.getProperty("file.separator") + secondLevelDir2);
-   insideNestedDir2.mkdirs();
-   insideNestedDir2.deleteOnExit();
-
-   // create a file in the first-level and two files in 
the nested dir
+   // create a file in the first-level and two 
files in the nested dir
--- End diff --

revert indentation change


---


[GitHub] flink issue #6398: [FLINK-9923][tests] Harden OneInputStreamTaskTest#testWat...

2018-07-24 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6398
  
well, that's a fairly general statement isn't it :/ 

Unless we find any mention anywhere of a major JVM not adhering to the JLS 
in this regard, that is suited for Flink applications, I would go with a 
volatile long.

Personally i never follow inspections at face value; we don't know how 
up-to-date this inspections or what information it is based on, which JVMs it 
applies to as it doesn't mention even a single example, and as with all 
JVM-/environment-specific inspections the applicability is questionable.


---


[GitHub] flink issue #6398: [FLINK-9923][tests] Harden OneInputStreamTaskTest#testWat...

2018-07-23 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6398
  
Do you have a source for volatile longs not being supported in all JVM 
implementations? I'm asking since the (expected) behavior is defined in the 
[Java Language 
Specification](https://docs.oracle.com/javase/specs/jls/se7/html/jls-17.html#jls-17.7).


---


[GitHub] flink issue #6388: [FLINK-6222] Allow passing env variables to start scripts...

2018-07-23 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6388
  
If a feature isn't visibly documented chances are no one will use it ;)

I'm not sure if the configuration page is the right place to put it, as it 
so far deals exclusively with settings set in `flink-conf.yaml`. Most notable 
this line in the introduction sticks out:

```
All configuration is done in conf/flink-conf.yaml, which is expected to be 
a flat collection of YAML key value pairs with format key: value.
```

You could name it `flink-client-env-sh`, that would make it make it more 
obvious that it only applies to the client.
However i have to ask, why a separate file in the first place? We already 
have config options for setting environment variables (`env.java.opts`); 
couldn't we introduce a separate option for clients?


---


[GitHub] flink pull request #6395: [FLINK-9900][tests] Harden ZooKeeperHighAvailabili...

2018-07-23 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6395

[FLINK-9900][tests] Harden ZooKeeperHighAvailabilityITCase

## What is the purpose of the change

This PR makes a few modifications to the `ZooKeeperHighAvailabilityITCase` 
to reduce the chances for intermittent test failures and timeouts.

Changes:
## 1)
The test was moving files out of the HA storage directory with a simple 
loop using `File#renameTo`. The test enforced that the moving is successful, 
however since old checkpoints may be deleted asynchronously this may not always 
be the case.
We now use a `FileVisitor` and ignore `IOExceptions` that occur while 
moving.
If no checkpoint file could be moved the test will still fail.

## 2)
After the checkpoint files were moved out of the HA storage directory the 
job is thrown into a restart loop. To verify the restart behavior the test was 
polling the job state and checked for the `RESTARTING` and `FAILING` states.
Due to the small size the job is in these states only for a short time, 
effectively adding a race condition. Thus this loop mayrun for longer than 
anticipated; the largest outlier i got locally was 50 seconds which isn't 
_that_ for off from the 2 minute timeout. I suspect this to be the failure 
cause raised in the JIRA, but I can't guarantee it.
Instead we now access the `fullRestarts` metric using a custom reporter to 
check how many restarts have occurred. The actual _state transitions_ should be 
irrelevant to the test.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9900

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6395.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6395


commit b8827dc3723558c52ad567bf88f24ae34129ea08
Author: zentol 
Date:   2018-07-23T14:21:32Z

[FLINK-9900][tests] Harden ZooKeeperHighAvailabilityITCase




---


[GitHub] flink issue #6378: [FLINK-9236] [pom] upgrade the version of apache parent p...

2018-07-22 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6378
  
@zhangminglei @YCjia Please do not push changes for the sole purpose of 
re-triggering Travis. We already know that they passed (due to the travis 
fork), and realistically there's no way for the parent pom to influence this 
test.


---


[GitHub] flink pull request #6372: [Flink 9353] Tests running per job standalone clus...

2018-07-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6372#discussion_r204018623
  
--- Diff: flink-end-to-end-tests/run-pre-commit-tests.sh ---
@@ -51,6 +51,7 @@ run_test "Shaded Hadoop S3A end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_
 run_test "Shaded Presto S3 end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_shaded_presto_s3.sh"
 run_test "Hadoop-free Wordcount end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_hadoop_free.sh"
 run_test "Distributed cache end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_distributed_cache_via_blob.sh"
+run_test "Wordcount end-to-end test in docker env" 
"$END_TO_END_DIR/test-scripts/test_docker_embedded_job.sh"
--- End diff --

if we can't run it on travis it shouldn't be in this script?


---


[GitHub] flink pull request #6377: [FLINK-8981] Add end-to-end test for running on YA...

2018-07-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6377#discussion_r203998139
  
--- Diff: 
flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/Dockerfile ---
@@ -0,0 +1,159 @@

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+#
+# This image is modified version of Knappek/docker-hadoop-secure
+#   * Knappek/docker-hadoop-secure 
<https://github.com/Knappek/docker-hadoop-secure>
+#
+# With bits and pieces added from Lewuathe/docker-hadoop-cluster to extend 
it to start a proper kerberized Hadoop cluster:
+#   * Lewuathe/docker-hadoop-cluster 
<https://github.com/Lewuathe/docker-hadoop-cluster>
+#
+# Author: Aljoscha Krettek
+# Date:   2018 May, 15
+#
+# Creates multi-node, kerberized Hadoop cluster on Docker
+
+FROM sequenceiq/pam:ubuntu-14.04
+MAINTAINER aljoscha
+
+USER root
+
+RUN addgroup hadoop
+RUN useradd -d /home/hdfs -ms /bin/bash -G hadoop -p hdfs hdfs
+RUN useradd -d /home/yarn -ms /bin/bash -G hadoop -p yarn yarn
+RUN useradd -d /home/mapred -ms /bin/bash -G hadoop -p mapred mapred
+
+RUN useradd -d /home/hadoop-user -ms /bin/bash -p hadoop-user hadoop-user
+
+# install dev tools
+RUN apt-get update
+RUN apt-get install -y curl tar sudo openssh-server openssh-client rsync 
unzip
+
+# Kerberos client
+RUN apt-get install krb5-user -y
+RUN mkdir -p /var/log/kerberos
+RUN touch /var/log/kerberos/kadmind.log
+
+# passwordless ssh
+RUN rm -f /etc/ssh/ssh_host_dsa_key /etc/ssh/ssh_host_rsa_key 
/root/.ssh/id_rsa
+RUN ssh-keygen -q -N "" -t dsa -f /etc/ssh/ssh_host_dsa_key
+RUN ssh-keygen -q -N "" -t rsa -f /etc/ssh/ssh_host_rsa_key
+RUN ssh-keygen -q -N "" -t rsa -f /root/.ssh/id_rsa
+RUN cp /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
+
+# java
+RUN mkdir -p /usr/java/default && \
+ curl -Ls 
'http://download.oracle.com/otn-pub/java/jdk/8u131-b11/d54c1d3a095b4ff2b6607d096fa80163/jdk-8u131-linux-x64.tar.gz'
 -H 'Cookie: oraclelicense=accept-securebackup-cookie' | \
+ tar --strip-components=1 -xz -C /usr/java/default/
+
+ENV JAVA_HOME /usr/java/default
+ENV PATH $PATH:$JAVA_HOME/bin
+
+RUN curl -LOH 'Cookie: oraclelicense=accept-securebackup-cookie' 
'http://download.oracle.com/otn-pub/java/jce/8/jce_policy-8.zip'
+RUN unzip jce_policy-8.zip
+RUN cp /UnlimitedJCEPolicyJDK8/local_policy.jar 
/UnlimitedJCEPolicyJDK8/US_export_policy.jar $JAVA_HOME/jre/lib/security
+
+ENV HADOOP_VERSION=2.8.4
--- End diff --

I agree, but for now we still have to ensure that the hadoop version in 
flink-dist matches, no?


---


[GitHub] flink pull request #6377: [FLINK-8981] Add end-to-end test for running on YA...

2018-07-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6377#discussion_r203997764
  
--- Diff: 
flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/bootstrap.sh 
---
@@ -0,0 +1,121 @@
+#!/bin/bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+: ${HADOOP_PREFIX:=/usr/local/hadoop}
+
+$HADOOP_PREFIX/etc/hadoop/hadoop-env.sh
+
+rm /tmp/*.pid
+
+# installing libraries if any - (resource urls added comma separated to 
the ACP system variable)
+cd $HADOOP_PREFIX/share/hadoop/common ; for cp in ${ACP//,/ }; do  echo == 
$cp; curl -LO $cp ; done; cd -
+
+# kerberos client
+sed -i "s/EXAMPLE.COM/${KRB_REALM}/g" /etc/krb5.conf
--- End diff --

yeah nvm, I doubt introducing a placeholder really fixes things :/


---


[GitHub] flink pull request #6377: [FLINK-8981] Add end-to-end test for running on YA...

2018-07-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6377#discussion_r203974298
  
--- Diff: 
flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/bootstrap.sh 
---
@@ -0,0 +1,121 @@
+#!/bin/bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+: ${HADOOP_PREFIX:=/usr/local/hadoop}
+
+$HADOOP_PREFIX/etc/hadoop/hadoop-env.sh
+
+rm /tmp/*.pid
+
+# installing libraries if any - (resource urls added comma separated to 
the ACP system variable)
+cd $HADOOP_PREFIX/share/hadoop/common ; for cp in ${ACP//,/ }; do  echo == 
$cp; curl -LO $cp ; done; cd -
+
+# kerberos client
+sed -i "s/EXAMPLE.COM/${KRB_REALM}/g" /etc/krb5.conf
--- End diff --

`EXAMPLE.COM` is used in several places, is there any way we can set this 
in a single place? (for example with search if necessary)


---


[GitHub] flink pull request #6377: [FLINK-8981] Add end-to-end test for running on YA...

2018-07-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6377#discussion_r203972431
  
--- Diff: 
flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/config/log4j.properties
 ---
@@ -0,0 +1,354 @@

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+# Define some default values that can be overridden by system properties
+hadoop.root.logger=INFO,console
+hadoop.log.dir=.
+hadoop.log.file=hadoop.log
+
+# Define the root logger to the system property "hadoop.root.logger".
+log4j.rootLogger=${hadoop.root.logger}, EventCounter
+
+# Logging Threshold
+log4j.threshold=ALL
+
+# Null Appender
+log4j.appender.NullAppender=org.apache.log4j.varia.NullAppender
+
+#
+# Rolling File Appender - cap space usage at 5gb.
+#
+hadoop.log.maxfilesize=256MB
+hadoop.log.maxbackupindex=20
+log4j.appender.RFA=org.apache.log4j.RollingFileAppender
+log4j.appender.RFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+log4j.appender.RFA.MaxFileSize=${hadoop.log.maxfilesize}
+log4j.appender.RFA.MaxBackupIndex=${hadoop.log.maxbackupindex}
+
+log4j.appender.RFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.RFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} 
(%F:%M(%L)) - %m%n
+
+
+#
+# Daily Rolling File Appender
+#
+
+log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
+log4j.appender.DRFA.File=${hadoop.log.dir}/${hadoop.log.file}
+
+# Rollover at midnight
+log4j.appender.DRFA.DatePattern=.-MM-dd
+
+log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
+
+# Pattern format: Date LogLevel LoggerName LogMessage
+log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+# Debugging Pattern format
+#log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %-5p %c{2} 
(%F:%M(%L)) - %m%n
+
+
+#
+# console
+# Add "console" to rootlogger above if you want to use this
+#
+
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
%c{2}: %m%n
+
+#
+# TaskLog Appender
+#
+
+#Default values
+hadoop.tasklog.taskid=null
+hadoop.tasklog.iscleanup=false
+hadoop.tasklog.noKeepSplits=4
+hadoop.tasklog.totalLogFileSize=100
+hadoop.tasklog.purgeLogSplits=true
+hadoop.tasklog.logsRetainHours=12
+
+log4j.appender.TLA=org.apache.hadoop.mapred.TaskLogAppender
+log4j.appender.TLA.taskId=${hadoop.tasklog.taskid}
+log4j.appender.TLA.isCleanup=${hadoop.tasklog.iscleanup}
+log4j.appender.TLA.totalLogFileSize=${hadoop.tasklog.totalLogFileSize}
+
+log4j.appender.TLA.layout=org.apache.log4j.PatternLayout
+log4j.appender.TLA.layout.ConversionPattern=%d{ISO8601} %p %c: %m%n
+
+#
+# HDFS block state change log from block manager
+#
+# Uncomment the following to log normal block state change
+# messages from BlockManager in NameNode.
+#log4j.logger.BlockStateChange=DEBUG
+
+#
+#Security appender
+#
+hadoop.security.logger=INFO,NullAppender
+hadoop.security.log.maxfilesize=256MB
+hadoop.security.log.maxbackupindex=20
+log4j.category.SecurityLogger=${hadoop.security.logger}
+hadoop.security.log.file=SecurityAuth-${user.name}.audit
+log4j.appender.RFAS=org.apache.log4j.RollingFileAppender
+log4j.appender.RFAS.File=${hadoop.log.dir}/${hadoop.security.log.file}
+log4j.appender.RFAS.layout=org.apache.log4j.PatternLayout
+log4j.appender.RFAS.layout.C

[GitHub] flink pull request #6377: [FLINK-8981] Add end-to-end test for running on YA...

2018-07-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6377#discussion_r203972230
  
--- Diff: flink-end-to-end-tests/test-scripts/test_yarn_kerberos_docker.sh 
---
@@ -0,0 +1,104 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+set -o pipefail
+
+source "$(dirname "$0")"/common.sh
+
+FLINK_TARBALL_DIR=$TEST_DATA_DIR
+FLINK_TARBALL=flink.tar.gz
+FLINK_DIRNAME=$(basename $FLINK_DIR)
+
+echo "Flink Tarball directory $FLINK_TARBALL_DIR"
+echo "Flink tarball filename $FLINK_TARBALL"
+echo "Flink distribution directory name $FLINK_DIRNAME"
+echo "End-to-end directory $END_TO_END_DIR"
+docker --version
+docker-compose --version
+
+mkdir -p $FLINK_TARBALL_DIR
+tar czf $FLINK_TARBALL_DIR/$FLINK_TARBALL -C $(dirname $FLINK_DIR) .
+
+echo "Building Hadoop Docker container"
+until docker build -f 
$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/Dockerfile -t 
flink/docker-hadoop-secure-cluster:latest 
$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/; do
+# with all the downloading and ubuntu updating a lot of flakiness can 
happen, make sure
+# we don't immediately fail
+echo "Something went wrong while building the Docker image, retrying 
..."
+sleep 2
+done
+
+echo "Starting Hadoop cluster"
+docker-compose -f 
$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/docker-compose.yml up 
-d
+
+# make sure we stop our cluster at the end
+function cluster_shutdown {
+  # don't call ourselves again for another signal interruption
+  trap "exit -1" INT
+  # don't call ourselves again for normal exit
+  trap "" EXIT
+
+  docker-compose -f 
$END_TO_END_DIR/test-scripts/docker-hadoop-secure-cluster/docker-compose.yml 
down
+  rm $FLINK_TARBALL_DIR/$FLINK_TARBALL
+}
+trap cluster_shutdown INT
+trap cluster_shutdown EXIT
+
+until docker cp $FLINK_TARBALL_DIR/$FLINK_TARBALL 
master:/home/hadoop-user/; do
+# we're retrying this one because we don't know yet if the container 
is ready
+echo "Uploading Flink tarball to docker master failed, retrying ..."
+sleep 5
+done
+
+# now, at least the container is ready
+docker exec -it master bash -c "tar xzf /home/hadoop-user/$FLINK_TARBALL 
--directory /home/hadoop-user/"
+
+docker exec -it master bash -c "echo \"security.kerberos.login.keytab: 
/home/hadoop-user/hadoop-user.keytab\" >> 
/home/hadoop-user/$FLINK_DIRNAME/conf/flink-conf.yaml"
+docker exec -it master bash -c "echo \"security.kerberos.login.principal: 
hadoop-user\" >> /home/hadoop-user/$FLINK_DIRNAME/conf/flink-conf.yaml"
+
+echo "Flink config:"
+docker exec -it master bash -c "cat 
/home/hadoop-user/$FLINK_DIRNAME/conf/flink-conf.yaml"
+
+# make the output path random, just in case it already exists, for example 
if we
+# had cached docker containers
+OUTPUT_PATH=hdfs:///user/hadoop-user/wc-out-$RANDOM
+
+# it's important to run this with higher parallelism, otherwise we might 
risk that
+# JM and TM are on the same YARN node and that we therefore don't test the 
keytab shipping
+until docker exec -it master bash -c "export HADOOP_CLASSPATH=\`hadoop 
classpath\` && /home/hadoop-user/$FLINK_DIRNAME/bin/flink run -m yarn-cluster 
-yn 3 -ys 1 -ytm 1200 -yjm 800 -p 3 
/home/hadoop-user/$FLINK_DIRNAME/examples/streaming/WordCount.jar --output 
$OUTPUT_PATH"; do
+echo "Running the Flink job failed, might be that the cluster is not 
ready yet, retrying ..."
--- End diff --

is there no way to check whether the cluster is ready? The logs contain 
several submission failures due to this :/


---


[GitHub] flink pull request #6377: [FLINK-8981] Add end-to-end test for running on YA...

2018-07-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6377#discussion_r203969501
  
--- Diff: 
flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/README.md ---
@@ -0,0 +1,118 @@
+# Apache Hadoop Docker image with Kerberos enabled
+
+This image is modified version of Knappek/docker-hadoop-secure
+ * Knappek/docker-hadoop-secure 
<https://github.com/Knappek/docker-hadoop-secure>
+
+With bits and pieces added from Lewuathe/docker-hadoop-cluster to extend 
it to start a proper kerberized Hadoop cluster:
+ * Lewuathe/docker-hadoop-cluster 
<https://github.com/Lewuathe/docker-hadoop-cluster>
+
+And a lot of added stuff for making this an actual, properly configured, 
kerberized cluster with proper user/permissions structure.
+
+Versions
+
+
+* JDK8
+* Hadoop 2.8.3
+
+Default Environment Variables
+-
+
+| Name | Value | Description |
+|  |   |  |
+| `KRB_REALM` | `EXAMPLE.COM` | The Kerberos Realm, more information 
[here](https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html#)
 |
+| `DOMAIN_REALM` | `example.com` | The Kerberos Domain Realm, more 
information 
[here](https://web.mit.edu/kerberos/krb5-1.12/doc/admin/conf_files/krb5_conf.html#)
 |
+| `KERBEROS_ADMIN` | `admin/admin` | The KDC admin user |
+| `KERBEROS_ADMIN_PASSWORD` | `admin` | The KDC admin password |
+
+You can simply define these variables in the `docker-compose.yml`.
+
+Run image
+-
+
+Clone the [Github 
project](https://github.com/aljoscha/docker-hadoop-secure-cluster) and run
--- End diff --

point to apache repo instead


---


[GitHub] flink pull request #6377: [FLINK-8981] Add end-to-end test for running on YA...

2018-07-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6377#discussion_r203973291
  
--- Diff: 
flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/Dockerfile ---
@@ -0,0 +1,159 @@

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+#
+# This image is modified version of Knappek/docker-hadoop-secure
+#   * Knappek/docker-hadoop-secure 
<https://github.com/Knappek/docker-hadoop-secure>
+#
+# With bits and pieces added from Lewuathe/docker-hadoop-cluster to extend 
it to start a proper kerberized Hadoop cluster:
+#   * Lewuathe/docker-hadoop-cluster 
<https://github.com/Lewuathe/docker-hadoop-cluster>
+#
+# Author: Aljoscha Krettek
+# Date:   2018 May, 15
+#
+# Creates multi-node, kerberized Hadoop cluster on Docker
+
+FROM sequenceiq/pam:ubuntu-14.04
+MAINTAINER aljoscha
+
+USER root
+
+RUN addgroup hadoop
+RUN useradd -d /home/hdfs -ms /bin/bash -G hadoop -p hdfs hdfs
+RUN useradd -d /home/yarn -ms /bin/bash -G hadoop -p yarn yarn
+RUN useradd -d /home/mapred -ms /bin/bash -G hadoop -p mapred mapred
+
+RUN useradd -d /home/hadoop-user -ms /bin/bash -p hadoop-user hadoop-user
+
+# install dev tools
+RUN apt-get update
+RUN apt-get install -y curl tar sudo openssh-server openssh-client rsync 
unzip
+
+# Kerberos client
+RUN apt-get install krb5-user -y
+RUN mkdir -p /var/log/kerberos
+RUN touch /var/log/kerberos/kadmind.log
+
+# passwordless ssh
+RUN rm -f /etc/ssh/ssh_host_dsa_key /etc/ssh/ssh_host_rsa_key 
/root/.ssh/id_rsa
+RUN ssh-keygen -q -N "" -t dsa -f /etc/ssh/ssh_host_dsa_key
+RUN ssh-keygen -q -N "" -t rsa -f /etc/ssh/ssh_host_rsa_key
+RUN ssh-keygen -q -N "" -t rsa -f /root/.ssh/id_rsa
+RUN cp /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
+
+# java
+RUN mkdir -p /usr/java/default && \
+ curl -Ls 
'http://download.oracle.com/otn-pub/java/jdk/8u131-b11/d54c1d3a095b4ff2b6607d096fa80163/jdk-8u131-linux-x64.tar.gz'
 -H 'Cookie: oraclelicense=accept-securebackup-cookie' | \
+ tar --strip-components=1 -xz -C /usr/java/default/
+
+ENV JAVA_HOME /usr/java/default
+ENV PATH $PATH:$JAVA_HOME/bin
+
+RUN curl -LOH 'Cookie: oraclelicense=accept-securebackup-cookie' 
'http://download.oracle.com/otn-pub/java/jce/8/jce_policy-8.zip'
+RUN unzip jce_policy-8.zip
+RUN cp /UnlimitedJCEPolicyJDK8/local_policy.jar 
/UnlimitedJCEPolicyJDK8/US_export_policy.jar $JAVA_HOME/jre/lib/security
+
+ENV HADOOP_VERSION=2.8.4
--- End diff --

This potentially uses a different hadoop version than the one against 
flink-dist was built against.


---


[GitHub] flink pull request #6377: [FLINK-8981] Add end-to-end test for running on YA...

2018-07-20 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6377#discussion_r203973314
  
--- Diff: 
flink-end-to-end-tests/test-scripts/docker-hadoop-secure-cluster/Dockerfile ---
@@ -0,0 +1,159 @@

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+#
+# This image is modified version of Knappek/docker-hadoop-secure
+#   * Knappek/docker-hadoop-secure 
<https://github.com/Knappek/docker-hadoop-secure>
+#
+# With bits and pieces added from Lewuathe/docker-hadoop-cluster to extend 
it to start a proper kerberized Hadoop cluster:
+#   * Lewuathe/docker-hadoop-cluster 
<https://github.com/Lewuathe/docker-hadoop-cluster>
+#
+# Author: Aljoscha Krettek
+# Date:   2018 May, 15
+#
+# Creates multi-node, kerberized Hadoop cluster on Docker
+
+FROM sequenceiq/pam:ubuntu-14.04
+MAINTAINER aljoscha
+
+USER root
+
+RUN addgroup hadoop
+RUN useradd -d /home/hdfs -ms /bin/bash -G hadoop -p hdfs hdfs
+RUN useradd -d /home/yarn -ms /bin/bash -G hadoop -p yarn yarn
+RUN useradd -d /home/mapred -ms /bin/bash -G hadoop -p mapred mapred
+
+RUN useradd -d /home/hadoop-user -ms /bin/bash -p hadoop-user hadoop-user
+
+# install dev tools
+RUN apt-get update
+RUN apt-get install -y curl tar sudo openssh-server openssh-client rsync 
unzip
+
+# Kerberos client
+RUN apt-get install krb5-user -y
+RUN mkdir -p /var/log/kerberos
+RUN touch /var/log/kerberos/kadmind.log
+
+# passwordless ssh
+RUN rm -f /etc/ssh/ssh_host_dsa_key /etc/ssh/ssh_host_rsa_key 
/root/.ssh/id_rsa
+RUN ssh-keygen -q -N "" -t dsa -f /etc/ssh/ssh_host_dsa_key
+RUN ssh-keygen -q -N "" -t rsa -f /etc/ssh/ssh_host_rsa_key
+RUN ssh-keygen -q -N "" -t rsa -f /root/.ssh/id_rsa
+RUN cp /root/.ssh/id_rsa.pub /root/.ssh/authorized_keys
+
+# java
+RUN mkdir -p /usr/java/default && \
+ curl -Ls 
'http://download.oracle.com/otn-pub/java/jdk/8u131-b11/d54c1d3a095b4ff2b6607d096fa80163/jdk-8u131-linux-x64.tar.gz'
 -H 'Cookie: oraclelicense=accept-securebackup-cookie' | \
+ tar --strip-components=1 -xz -C /usr/java/default/
+
+ENV JAVA_HOME /usr/java/default
+ENV PATH $PATH:$JAVA_HOME/bin
+
+RUN curl -LOH 'Cookie: oraclelicense=accept-securebackup-cookie' 
'http://download.oracle.com/otn-pub/java/jce/8/jce_policy-8.zip'
+RUN unzip jce_policy-8.zip
+RUN cp /UnlimitedJCEPolicyJDK8/local_policy.jar 
/UnlimitedJCEPolicyJDK8/US_export_policy.jar $JAVA_HOME/jre/lib/security
+
+ENV HADOOP_VERSION=2.8.4
+
+# ENV HADOOP_URL 
https://www.apache.org/dist/hadoop/common/hadoop-$HADOOP_VERSION/hadoop-$HADOOP_VERSION.tar.gz
--- End diff --

remove


---


[GitHub] flink issue #6370: [FLINK-9894] [runtime] Potential Data Race

2018-07-19 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6370
  
If another PR introduces race conditions, then these race conditions should 
be resolved in that very PR.


---


[GitHub] flink pull request #6374: [FLINK-9895][tests] Ensure error logging for Netty...

2018-07-19 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6374

[FLINK-9895][tests] Ensure error logging for NettyLeakDetectionResource

## What is the purpose of the change

This PR is a small addition to #6363 to ensure that ERROR logging is 
enabled for Nettys `ResourceLeakDetector`, as otherwise the the leak will not 
cause test failures.

## Verifying this change

* disable error logging in `flink-runtime` for `ResourceLeakDetector`. (see 
`log4j-test.properties`)
* disable auto-release in `FileUploadHandler`
* run `FileUploadHandlerTest`

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9895

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6374.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6374


commit 373d6ef65b50de86897a9da6d403982aae59a3d1
Author: zentol 
Date:   2018-07-19T11:47:53Z

[FLINK-9895][tests] Ensure error logging for NettyLeakDetectionResource




---


[GitHub] flink pull request #6371: [FLINK-9871] Use Description class for ConfigOptio...

2018-07-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6371#discussion_r203692344
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/AkkaOptions.java ---
@@ -42,22 +45,27 @@
public static final ConfigOption WATCH_HEARTBEAT_INTERVAL = 
ConfigOptions
.key("akka.watch.heartbeat.interval")
.defaultValue(ASK_TIMEOUT.defaultValue())
-   .withDescription("Heartbeat interval for Akka’s DeathWatch 
mechanism to detect dead TaskManagers. If" +
-   " TaskManagers are wrongly marked dead because of lost 
or delayed heartbeat messages, then you should" +
-   " decrease this value or increase 
akka.watch.heartbeat.pause. A thorough description of Akka’s DeathWatch" +
-   " can be found http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector\;>here.");
+   .withDescription(Description.builder()
+   .text("Heartbeat interval for Akka’s DeathWatch 
mechanism to detect dead TaskManagers. If" +
--- End diff --

we could think about adding another version of `withDescription` that works 
like `text`, so that you could write this as
```
withDescription(
"Heartbeat interval for Akka’s DeathWatch mechanism to detect dead 
TaskManagers. If" +
..
" Akka’s DeathWatch can be found %s",

link("http://doc.akka.io/docs/akka/snapshot/scala/remoting.html#failure-detector;,
 "here"));
```

Just a thought.


---


[GitHub] flink pull request #:

2018-07-19 Thread zentol
Github user zentol commented on the pull request:


https://github.com/apache/flink/commit/ec28f92ffd042308494d9661a38ab462738611aa#commitcomment-29761887
  
In 
flink-core/src/main/java/org/apache/flink/configuration/Configuration.java:
In 
flink-core/src/main/java/org/apache/flink/configuration/Configuration.java on 
line 744:
An option may be for the current and deprecated key(s) at the same time.


---


[GitHub] flink pull request #6363: [FLINK-9860][REST] fix buffer leak in FileUploadHa...

2018-07-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6363#discussion_r203662639
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyLeakDetectionResource.java
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector;
+import 
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetectorFactory;
+
+import org.junit.rules.ExternalResource;
+
+import javax.annotation.concurrent.GuardedBy;
+
+import static org.junit.Assert.fail;
+
+/**
+ * JUnit resource to fail with an assertion when Netty detects a resource 
leak (only with
+ * ERROR logging enabled for
+ * 
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector).
--- End diff --

Would it be possible to check for this condition by doing something like 
this in `@Before`?

```

Assert.assertTrue(LoggerFactory.getLogger(org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector.class).isErrorEnabled());
```
```


---


[GitHub] flink issue #6363: [FLINK-9860][REST] fix buffer leak in FileUploadHandler

2018-07-19 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6363
  
merging.


---


[GitHub] flink pull request #6355: [FLINK-9878][network][ssl] add more low-level ssl ...

2018-07-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6355#discussion_r203658272
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyServer.java
 ---
@@ -61,6 +62,7 @@
 
private ChannelFuture bindFuture;
 
+   private SSLUtils.SSLServerConfiguration serverSSLConfig;
--- End diff --

add `@Nullable`


---


[GitHub] flink pull request #6355: [FLINK-9878][network][ssl] add more low-level ssl ...

2018-07-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6355#discussion_r203657904
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/NettyClient.java
 ---
@@ -52,6 +55,7 @@
 
private Bootstrap bootstrap;
 
+   private SSLUtils.SSLClientConfiguration clientSSLConfig;
--- End diff --

add `@Nullable`


---


[GitHub] flink pull request #6355: [FLINK-9878][network][ssl] add more low-level ssl ...

2018-07-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6355#discussion_r203652194
  
--- Diff: docs/ops/security-ssl.md ---
@@ -33,6 +33,9 @@ SSL can be enabled for all network communication between 
Flink components. SSL k
 * **akka.ssl.enabled**: SSL flag for akka based control connection between 
the Flink client, jobmanager and taskmanager 
 * **jobmanager.web.ssl.enabled**: Flag to enable https access to the 
jobmanager's web frontend
 
+Please see the configuration page about the
+[complete list of SSL configuration 
parameters]({{site.baseurl}}/ops/config.html#ssl-settings), in particular 
**security.ssl.session-cache-size**.
--- End diff --

just a suggestion, you could also embed the entire table directly, see 
`Configuration.md` on how to do it.


---


[GitHub] flink pull request #6342: [FLINK-9748][release] Use dedicated directory for ...

2018-07-19 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6342#discussion_r203629175
  
--- Diff: tools/releasing/create_binary_release.sh ---
@@ -44,6 +44,12 @@ else
 SHASUM="sha512sum"
 fi
 
+cd ..
+
+FLINK_DIR=`pwd`
+RELEASE_DIR=${RELEASE_DIR}/tools/releasing/release
--- End diff --

yes!



---


[GitHub] flink pull request #6355: [FLINK-9878][network][ssl] add more low-level ssl ...

2018-07-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6355#discussion_r203437995
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java ---
@@ -160,4 +160,41 @@
key("security.ssl.verify-hostname")
.defaultValue(true)
.withDescription("Flag to enable peer’s hostname 
verification during ssl handshake.");
+
+   /**
+* SSL session cache size.
+*/
+   public static final ConfigOption SSL_SESSION_CACHE_SIZE =
+   key("security.ssl.session-cache-size")
+   .defaultValue(-1)
+   .withDescription("The size of the cache used for 
storing SSL session objects. "
+   + "According to 
https://github.com/netty/netty/issues/832, you should always set "
+   + "this to an appropriate number to not run 
into a bug with stalling IO threads "
+   + "during garbage collection. (-1 = use system 
default).");
+
+   /**
+* SSL session timeout.
+*/
+   public static final ConfigOption SSL_SESSION_TIMEOUT =
+   key("security.ssl.session-timeout")
+   .defaultValue(-1)
+   .withDescription("The timeout (in ms) for the cached 
SSL session objects. (-1 = use system default)");
+
+   /**
+* SSL session timeout during handshakes.
+*/
+   public static final ConfigOption SSL_HANDSHAKE_TIMEOUT =
+   key("security.ssl.handshake-timeout")
+   .defaultValue(-1)
+   .withDescription("The timeout (in ms) during SSL 
handshake. (-1 = use system default)");
+
+   /**
+* SSL session timeout after flushing the `close_notify` message.
+*/
+   public static final ConfigOption 
SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT =
+   key("security.ssl.close-notify-flush-timeout")
+   .defaultValue(-1)
+   .withDescription("The timeout (in ms) for flushing the 
`close_notify` that was triggered by closing a " +
--- End diff --

it's not showing up as a code block since that only works for markdown; the 
description so far was plain-text.


---


[GitHub] flink issue #6363: [FLINK-9860][REST] fix buffer leak in FileUploadHandler

2018-07-18 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6363
  
+1


---


[GitHub] flink pull request #6363: [FLINK-9860][REST] fix buffer leak in FileUploadHa...

2018-07-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6363#discussion_r203422176
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyLeakDetectionResource.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector;
+import 
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetectorFactory;
+
+import org.junit.rules.ExternalResource;
+
+import static org.junit.Assert.fail;
+
+/**
+ * JUnit resource to fail with an assertion when Netty detects a resource 
leak (only with
+ * ERROR logging enabled for
+ * 
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector).
+ *
+ * This should be used in a class rule:
+ * {@code
+ * @literal @ClassRule
+ *  public static final NettyLeakDetectionResource LEAK_DETECTION = new 
NettyLeakDetectionResource();
+ * }
+ */
+public class NettyLeakDetectionResource extends ExternalResource {
+   private static ResourceLeakDetectorFactory previousLeakDetector;
+   private static ResourceLeakDetector.Level previousLeakDetectorLevel;
+
+   @Override
+   protected void before() {
+   previousLeakDetector = ResourceLeakDetectorFactory.instance();
+   previousLeakDetectorLevel = ResourceLeakDetector.getLevel();
+
+   
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+   ResourceLeakDetectorFactory.setResourceLeakDetectorFactory(new 
FailingResourceLeakDetectorFactory());
--- End diff --

so this isn't something we necessarily have to deal with now, but if 
multiple tests that use this resource run in parallel we might not reset to the 
correct factory/level.

Effectively what we need is some kinda ref-counting so that only the first 
resource modifies the level and factory, and only the last resource reset them. 
:/




---


[GitHub] flink pull request #6363: [FLINK-9860][REST] fix buffer leak in FileUploadHa...

2018-07-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6363#discussion_r203414529
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -220,4 +224,5 @@ public void testUploadCleanupOnFailure() throws 
IOException {
}
MULTIPART_UPLOAD_RESOURCE.assertUploadDirectoryIsEmpty();
}
+
--- End diff --

revert


---


[GitHub] flink pull request #6363: [FLINK-9860][REST] fix buffer leak in FileUploadHa...

2018-07-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6363#discussion_r203414700
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyLeakDetectionResource.java
 ---
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector;
+import 
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetectorFactory;
+
+import org.junit.rules.ExternalResource;
+
+import static org.junit.Assert.fail;
+
+/**
+ * JUnit resource to fail with an assertion when Netty detects a resource 
leak (only with
+ * ERROR logging enabled for
+ * 
org.apache.flink.shaded.netty4.io.netty.util.ResourceLeakDetector).
+ *
+ * This should be used in a class rule:
+ * {@code
+ * @literal @ClassRule
+ *  public static final NettyLeakDetectionResource LEAK_DETECTION = new 
NettyLeakDetectionResource();
+ * }
+ */
+public class NettyLeakDetectionResource extends ExternalResource {
+   private static ResourceLeakDetectorFactory previousLeakDetector;
+   private static ResourceLeakDetector.Level previousLeakDetectorLevel;
+
+   @Override
+   protected void before() {
+   previousLeakDetector = ResourceLeakDetectorFactory.instance();
+   previousLeakDetectorLevel = ResourceLeakDetector.getLevel();
+
+   
ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+   ResourceLeakDetectorFactory.setResourceLeakDetectorFactory(new 
FailingResourceLeakDetectorFactory());
+   }
+
+   @Override
+   protected void after() {
+   
ResourceLeakDetectorFactory.setResourceLeakDetectorFactory(previousLeakDetector);
+   ResourceLeakDetector.setLevel(previousLeakDetectorLevel);
+   }
+
+   static class FailingResourceLeakDetectorFactory extends 
ResourceLeakDetectorFactory {
--- End diff --

these could be private?


---


[GitHub] flink issue #6352: [FLINK-9815][yarn][tests] Harden tests against slow job s...

2018-07-18 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6352
  
yay travis is green.


---


[GitHub] flink pull request #6363: [FLINK-9860][REST] fix buffer leak in FileUploadHa...

2018-07-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6363#discussion_r203373280
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/FileUploadHandlerTest.java
 ---
@@ -50,6 +55,24 @@
 
private static final ObjectMapper OBJECT_MAPPER = 
RestMapperUtils.getStrictObjectMapper();
 
+   private static ResourceLeakDetectorFactory previousLeakDetector;
+   private static ResourceLeakDetector.Level previousLeakDetectorLevel;
+
+   @BeforeClass
+   public static void setLeakDetector() {
--- End diff --

we could move this into a utility `LeakDetectorResource` so we can re-use 
it for any test by simply adding it as a `@Rule`.


---


[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...

2018-07-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6330#discussion_r203361773
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 ---
@@ -138,12 +154,22 @@ public JarRunHandler(
});
}
 
-   private static SavepointRestoreSettings getSavepointRestoreSettings(
-   final @Nonnull HandlerRequest request)
+   private SavepointRestoreSettings getSavepointRestoreSettings(
+   final @Nonnull HandlerRequest request)
throws RestHandlerException {
 
-   final boolean allowNonRestoredState = 
getQueryParameter(request, AllowNonRestoredStateQueryParameter.class, false);
-   final String savepointPath = getQueryParameter(request, 
SavepointPathQueryParameter.class);
+   final JarRunRequestBody requestBody = request.getRequestBody();
+
+   final boolean allowNonRestoredState = 
fromRequestBodyOrQueryParameter(
+   requestBody.getAllowNonRestoredState(),
+   () -> getQueryParameter(request, 
AllowNonRestoredStateQueryParameter.class),
+   false,
+   log);
+   final String savepointPath = fromRequestBodyOrQueryParameter(
--- End diff --

How does this prevent the scenario i described?


---


[GitHub] flink issue #6296: [FLINK-9793] YARN:When submitting a flink job with yarn-c...

2018-07-18 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6296
  
merging.


---


[GitHub] flink pull request #6362: [FLINK-9888][release] Remove unsafe defaults from ...

2018-07-18 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6362

[FLINK-9888][release] Remove unsafe defaults from release scripts

## What is the purpose of the change

This PR removes several unnecessary and unsafe `*_VERSION` defaults from 
the release scripts.
The scripts should never be called without explicitly setting these values.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9888

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6362.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6362


commit 93c8f557af888020dca8ae81ae117f4e460f475d
Author: zentol 
Date:   2018-07-18T11:42:06Z

[FLINK-9888][release] Remove unsafe defaults from release scripts




---


[GitHub] flink issue #6352: [FLINK-9815][yarn][tests] Harden tests against slow job s...

2018-07-18 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6352
  
The `YARNHighAvailabilityITCase` still has the same problem.


---


[GitHub] flink pull request #6355: [FLINK-9878][network][ssl] add more low-level ssl ...

2018-07-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6355#discussion_r203326103
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/SecurityOptions.java ---
@@ -160,4 +160,41 @@
key("security.ssl.verify-hostname")
.defaultValue(true)
.withDescription("Flag to enable peer’s hostname 
verification during ssl handshake.");
+
+   /**
+* SSL session cache size.
+*/
+   public static final ConfigOption SSL_SESSION_CACHE_SIZE =
+   key("security.ssl.session-cache-size")
+   .defaultValue(-1)
+   .withDescription("The size of the cache used for 
storing SSL session objects. "
+   + "According to 
https://github.com/netty/netty/issues/832, you should always set "
+   + "this to an appropriate number to not run 
into a bug with stalling IO threads "
+   + "during garbage collection. (-1 = use system 
default).");
+
+   /**
+* SSL session timeout.
+*/
+   public static final ConfigOption SSL_SESSION_TIMEOUT =
+   key("security.ssl.session-timeout")
+   .defaultValue(-1)
+   .withDescription("The timeout (in ms) for the cached 
SSL session objects. (-1 = use system default)");
+
+   /**
+* SSL session timeout during handshakes.
+*/
+   public static final ConfigOption SSL_HANDSHAKE_TIMEOUT =
+   key("security.ssl.handshake-timeout")
+   .defaultValue(-1)
+   .withDescription("The timeout (in ms) during SSL 
handshake. (-1 = use system default)");
+
+   /**
+* SSL session timeout after flushing the `close_notify` message.
+*/
+   public static final ConfigOption 
SSL_CLOSE_NOTIFY_FLUSH_TIMEOUT =
+   key("security.ssl.close-notify-flush-timeout")
+   .defaultValue(-1)
+   .withDescription("The timeout (in ms) for flushing the 
`close_notify` that was triggered by closing a " +
--- End diff --

could you try removing the ` signs? let's see if that trips up the test.


---


[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...

2018-07-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6330#discussion_r203309580
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link RequestBody} for running a jar.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class JarRunRequestBody implements RequestBody {
+
+   private static final String FIELD_NAME_ENTRY_CLASS = "entryClass";
+   private static final String FIELD_NAME_PROGRAM_ARGUMENTS = 
"programArgs";
+   private static final String FIELD_NAME_PARALLELISM = "parallelism";
+   private static final String FIELD_NAME_ALLOW_NON_RESTORED_STATE = 
"allowNonRestoredState";
+   private static final String FIELD_NAME_SAVEPOINT_PATH = "savepointPath";
+
+   @JsonProperty(FIELD_NAME_ENTRY_CLASS)
+   @Nullable
+   private String entryClassName;
+
+   @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS)
+   @Nullable
+   private String programArguments;
+
+   @JsonProperty(FIELD_NAME_PARALLELISM)
+   @Nullable
+   private Integer parallelism;
+
+   @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE)
+   @Nullable
+   private Boolean allowNonRestoredState;
+
+   @JsonProperty(FIELD_NAME_SAVEPOINT_PATH)
+   @Nullable
+   private String savepointPath;
+
+   public JarRunRequestBody() {
+   this(null, null, null, null, null);
+   }
+
+   @JsonCreator
+   public JarRunRequestBody(
+   @JsonProperty(FIELD_NAME_ENTRY_CLASS) String 
entryClassName,
+   @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String 
programArguments,
+   @JsonProperty(FIELD_NAME_PARALLELISM) Integer 
parallelism,
+   @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE) 
Boolean allowNonRestoredState,
+   @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String 
savepointPath) {
--- End diff --

For example, if only a partial body is sent some fields may be null. I 
couldn't quickly find a way to allow either all fields or non to be null.


---


[GitHub] flink pull request #6312: [FLINK-9792] Added custom Description class for Co...

2018-07-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6312#discussion_r203306483
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/description/Formatter.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration.description;
+
+/**
+ * Allows providing multiple formatters for the description. E.g. Html 
formatter, Markdown formatter etc.
+ */
+public abstract class Formatter {
+
+   private StringBuilder state = new StringBuilder();
+
+   /**
+* Formats the description into a String using format specific tags.
+*
+* @param description description to be formatted
+* @return string representation of the description
+*/
+   public String format(Description description) {
+   for (BlockElement blockElement : description.getBlocks()) {
+   blockElement.format(this);
+   }
+   return finalizeFormatting();
+   }
+
+   public void format(LinkElement element) {
+   formatLink(state, element.getLink(), element.getText());
+   }
+
+   public void format(TextElement element) {
+   String[] inlineElements = element.getElements().stream().map(el 
-> {
+   Formatter formatter = newInstance();
+   el.format(formatter);
+   return formatter.finalizeFormatting();
+   }
+   ).toArray(String[]::new);
+   formatText(state, escapeFormatPlaceholder(element.getFormat()), 
inlineElements);
+   }
+
+   public void format(LineBreakElement element) {
+   formatLineBreak(state);
+   }
+
+   public void format(ListElement element) {
+   String[] inlineElements = element.getEntries().stream().map(el 
-> {
+   Formatter formatter = newInstance();
+   el.format(formatter);
+   return formatter.finalizeFormatting();
+   }
+   ).toArray(String[]::new);
+   formatList(state, inlineElements);
+   }
+
+   private String finalizeFormatting() {
+   String result = state.toString();
+   state.setLength(0);
+   return result.replaceAll("%%", "%");
--- End diff --

If a link description contains `%%` this will unintentionally modify that, 
correct? Granted this is quite an edge-case.


---


[GitHub] flink pull request #6312: [FLINK-9792] Added custom Description class for Co...

2018-07-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6312#discussion_r203305919
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/description/Formatter.java
 ---
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration.description;
+
+/**
+ * Allows providing multiple formatters for the description. E.g. Html 
formatter, Markdown formatter etc.
+ */
+public abstract class Formatter {
+
+   private StringBuilder state = new StringBuilder();
--- End diff --

could be final


---


[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...

2018-07-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6330#discussion_r203303832
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java
 ---
@@ -138,12 +154,22 @@ public JarRunHandler(
});
}
 
-   private static SavepointRestoreSettings getSavepointRestoreSettings(
-   final @Nonnull HandlerRequest request)
+   private SavepointRestoreSettings getSavepointRestoreSettings(
+   final @Nonnull HandlerRequest request)
throws RestHandlerException {
 
-   final boolean allowNonRestoredState = 
getQueryParameter(request, AllowNonRestoredStateQueryParameter.class, false);
-   final String savepointPath = getQueryParameter(request, 
SavepointPathQueryParameter.class);
+   final JarRunRequestBody requestBody = request.getRequestBody();
+
+   final boolean allowNonRestoredState = 
fromRequestBodyOrQueryParameter(
+   requestBody.getAllowNonRestoredState(),
+   () -> getQueryParameter(request, 
AllowNonRestoredStateQueryParameter.class),
+   false,
+   log);
+   final String savepointPath = fromRequestBodyOrQueryParameter(
--- End diff --

This could result in unexpected NullPointerExceptions when retrieving a 
primitive, like in the following example:
```
fromRequestBodyOrQueryParameter(
requestBody.getParallelism(),
() -> getQueryParameter(request, 
ParallelismQueryParameter.class)
log);
```
The explicit default argument prevents that from happening.


---


[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...

2018-07-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6330#discussion_r203302981
  
--- Diff: 
flink-runtime-web/src/test/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandlerParameterTest.java
 ---
@@ -0,0 +1,313 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.BlobServerOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.dispatcher.DispatcherGateway;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
+import org.apache.flink.runtime.messages.Acknowledge;
+import org.apache.flink.runtime.rest.handler.HandlerRequest;
+import org.apache.flink.runtime.rest.handler.HandlerRequestException;
+import org.apache.flink.runtime.rest.messages.MessageParameter;
+import org.apache.flink.runtime.rest.messages.MessageQueryParameter;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.util.BlobServerResource;
+import org.apache.flink.runtime.webmonitor.TestingDispatcherGateway;
+import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
+import org.apache.flink.runtime.webmonitor.testutils.ParameterProgram;
+import org.apache.flink.util.function.SupplierWithException;
+import org.apache.flink.util.function.ThrowingConsumer;
+
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+
+/**
+ * Tests for the parameter handling of the {@link JarRunHandler}.
+ */
+public class JarRunHandlerParameterTest {
+
+   @ClassRule
+   public static final TemporaryFolder TMP = new TemporaryFolder();
+
+   @ClassRule
+   public static final BlobServerResource BLOB_SERVER_RESOURCE = new 
BlobServerResource();
+
+   private static final AtomicReference 
lastSubmittedJobGraphReference = new AtomicReference<>();
+   private static JarRunHandler handler;
+   private static Path jarWithManifest;
+   private static Path jarWithoutManifest;
+   private static TestingDispatcherGateway restfulGateway;
+
+   @BeforeClass
+   public static void setup() throws Exception {
+   Path jarDir = TMP.newFolder().toPath();
+
+   // properties are set property by surefire plugin
+   final String parameterProgramJarName = 
System.getProperty("parameterJarName") + ".jar";
+   final String parameterProgramWithoutManifestJarName = 
System.getProperty("parameterJarWithoutManifestName") + ".jar";
+   final Path jarLocation = 
Paths.get(System.getProperty("targetDir"));
+
+   jarWithManifest = Files.copy(
+   jarLocation.resolve(parameterProgramJarName),
+   jarDir.resolve("program-with-manifest.jar"));
+   jarWithoutManifest = Files.copy(
+   
jarLocation.resolve(parameterProgramWithoutManifestJarName),
+   jarDir.resolve("program-without-manifest.jar"));
+
+   Configuration config = new Configuration();
+   config.setString(BlobServerOptions.STORAGE_DIRECTORY,
+   TMP.newFolder()

[GitHub] flink pull request #6330: [FLINK-9499][rest] Support JSON request in JarHand...

2018-07-18 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6330#discussion_r203302518
  
--- Diff: 
flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunRequestBody.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.webmonitor.handlers;
+
+import org.apache.flink.runtime.rest.messages.RequestBody;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+
+import javax.annotation.Nullable;
+
+/**
+ * {@link RequestBody} for running a jar.
+ */
+@JsonInclude(JsonInclude.Include.NON_NULL)
+public class JarRunRequestBody implements RequestBody {
+
+   private static final String FIELD_NAME_ENTRY_CLASS = "entryClass";
+   private static final String FIELD_NAME_PROGRAM_ARGUMENTS = 
"programArgs";
+   private static final String FIELD_NAME_PARALLELISM = "parallelism";
+   private static final String FIELD_NAME_ALLOW_NON_RESTORED_STATE = 
"allowNonRestoredState";
+   private static final String FIELD_NAME_SAVEPOINT_PATH = "savepointPath";
+
+   @JsonProperty(FIELD_NAME_ENTRY_CLASS)
+   @Nullable
+   private String entryClassName;
+
+   @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS)
+   @Nullable
+   private String programArguments;
+
+   @JsonProperty(FIELD_NAME_PARALLELISM)
+   @Nullable
+   private Integer parallelism;
+
+   @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE)
+   @Nullable
+   private Boolean allowNonRestoredState;
+
+   @JsonProperty(FIELD_NAME_SAVEPOINT_PATH)
+   @Nullable
+   private String savepointPath;
+
+   public JarRunRequestBody() {
+   this(null, null, null, null, null);
+   }
+
+   @JsonCreator
+   public JarRunRequestBody(
+   @JsonProperty(FIELD_NAME_ENTRY_CLASS) String 
entryClassName,
+   @JsonProperty(FIELD_NAME_PROGRAM_ARGUMENTS) String 
programArguments,
+   @JsonProperty(FIELD_NAME_PARALLELISM) Integer 
parallelism,
+   @JsonProperty(FIELD_NAME_ALLOW_NON_RESTORED_STATE) 
Boolean allowNonRestoredState,
+   @JsonProperty(FIELD_NAME_SAVEPOINT_PATH) String 
savepointPath) {
--- End diff --

yes they should be nullable


---


[GitHub] flink issue #6352: [FLINK-9815][yarn][tests] Harden tests against slow job s...

2018-07-17 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6352
  
will look into them tomorrow


---


[GitHub] flink issue #6352: [FLINK-9815][yarn][tests] Harden tests against slow job s...

2018-07-17 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6352
  
the test failures may highlight tests that weren't shutting down the last 
application properly; previously this would've succeeded since the check was 
done in `@Before`.


---


[GitHub] flink pull request #6312: [FLINK-9792] Added custom Description class for Co...

2018-07-17 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6312#discussion_r203074481
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration.description;
+
+/**
+ * Formatter that transforms {@link Description} into Html representation.
+ */
+public class HtmlFormatter extends Formatter {
+
+   @Override
+   protected void formatLink(StringBuilder state, String link, String 
description) {
+   state.append(String.format("%s", link, 
description));
+   }
+
+   @Override
+   protected void formatLineBreak(StringBuilder state) {
+   state.append("");
+   }
+
+   @Override
+   protected void formatText(StringBuilder state, String format, String[] 
elements) {
+   String escapedFormat = escapeCharacters(format);
+   state.append(String.format(escapedFormat, elements));
+   }
+
+   @Override
+   protected void formatList(StringBuilder state, String[] entries) {
+   state.append("");
+   for (String entry : entries) {
+   state.append(String.format("%s", entry));
+   }
+   state.append("");
+   }
+
+   @Override
+   protected Formatter newInstance() {
+   return new HtmlFormatter();
+   }
+
+   private static final String TEMPORARY_PLACEHOLDER = 
"superRandomTemporaryPlaceholder";
+
+   private static String escapeCharacters(String value) {
+   return value
+   .replaceAll("%s", TEMPORARY_PLACEHOLDER)
--- End diff --

ok that makes sense, do we maybe want to handle this in `Formatter´ 
already?.


---


[GitHub] flink pull request #6312: [FLINK-9792] Added custom Description class for Co...

2018-07-17 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6312#discussion_r203069199
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration.description;
+
+/**
+ * Formatter that transforms {@link Description} into Html representation.
+ */
+public class HtmlFormatter extends Formatter {
+
+   @Override
+   protected void formatLink(StringBuilder state, String link, String 
description) {
+   state.append(String.format("%s", link, 
description));
+   }
+
+   @Override
+   protected void formatLineBreak(StringBuilder state) {
+   state.append("");
+   }
+
+   @Override
+   protected void formatText(StringBuilder state, String format, String[] 
elements) {
+   String escapedFormat = escapeCharacters(format);
+   state.append(String.format(escapedFormat, elements));
+   }
+
+   @Override
+   protected void formatList(StringBuilder state, String[] entries) {
+   state.append("");
+   for (String entry : entries) {
+   state.append(String.format("%s", entry));
+   }
+   state.append("");
+   }
+
+   @Override
+   protected Formatter newInstance() {
+   return new HtmlFormatter();
+   }
+
+   private static final String TEMPORARY_PLACEHOLDER = 
"superRandomTemporaryPlaceholder";
+
+   private static String escapeCharacters(String value) {
+   return value
+   .replaceAll("%s", TEMPORARY_PLACEHOLDER)
--- End diff --

and what is the actual issue?


---


[GitHub] flink pull request #6312: [FLINK-9792] Added custom Description class for Co...

2018-07-17 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6312#discussion_r203069177
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration.description;
+
+/**
+ * Formatter that transforms {@link Description} into Html representation.
+ */
+public class HtmlFormatter extends Formatter {
+
+   @Override
+   protected void formatLink(StringBuilder state, String link, String 
description) {
+   state.append(String.format("%s", link, 
description));
+   }
+
+   @Override
+   protected void formatLineBreak(StringBuilder state) {
+   state.append("");
+   }
+
+   @Override
+   protected void formatText(StringBuilder state, String format, String[] 
elements) {
+   String escapedFormat = escapeCharacters(format);
+   state.append(String.format(escapedFormat, elements));
+   }
+
+   @Override
+   protected void formatList(StringBuilder state, String[] entries) {
+   state.append("");
+   for (String entry : entries) {
+   state.append(String.format("%s", entry));
+   }
+   state.append("");
+   }
+
+   @Override
+   protected Formatter newInstance() {
+   return new HtmlFormatter();
+   }
+
+   private static final String TEMPORARY_PLACEHOLDER = 
"superRandomTemporaryPlaceholder";
--- End diff --

any my point is exactly to prevent bugs from masking each other. The 
placeholder was safe to use since we could be reasonably sure it will not be 
present in the input. Once 2 entities use the same placeholder, with access to 
the same input, this is no longer the case.


---


[GitHub] flink pull request #6352: [FLINK-8163][yarn][tests] Harden tests against slo...

2018-07-17 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6352

[FLINK-8163][yarn][tests] Harden tests against slow job shutdowns

## What is the purpose of the change

This PR hardens the `YarnTestBase` against jobs that just don't want to 
shut down that quickly (i.e. within 500ms).
The maximum waiting time has been increase to 10 seconds, during which we 
periodically check the state of all applications.

Additionally, the failure condition from `@Before` was moved to the 
`@After` method.

This change will allow us to better differentiate between simple timing 
issues and unsuccessful job shutdowns.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 8163

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6352.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6352


commit 0dd65378f0c9f477bb8f5712bbc0b1f31440f5f0
Author: zentol 
Date:   2018-07-17T11:29:16Z

[FLINK-8163][yarn][tests] Harden tests against slow job shutdowns




---


[GitHub] flink issue #6347: [hotfix] consistency: vertexes -> vertices

2018-07-17 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6347
  
yes


---


[GitHub] flink pull request #6312: [FLINK-9792] Added custom Description class for Co...

2018-07-17 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6312#discussion_r202970121
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration.description;
+
+/**
+ * Formatter that transforms {@link Description} into Html representation.
+ */
+public class HtmlFormatter extends Formatter {
+
+   @Override
+   protected void formatLink(StringBuilder state, String link, String 
description) {
+   state.append(String.format("%s", link, 
description));
+   }
+
+   @Override
+   protected void formatLineBreak(StringBuilder state) {
+   state.append("");
+   }
+
+   @Override
+   protected void formatText(StringBuilder state, String format, String[] 
elements) {
+   String escapedFormat = escapeCharacters(format);
+   state.append(String.format(escapedFormat, elements));
+   }
+
+   @Override
+   protected void formatList(StringBuilder state, String[] entries) {
+   state.append("");
+   for (String entry : entries) {
+   state.append(String.format("%s", entry));
+   }
+   state.append("");
+   }
+
+   @Override
+   protected Formatter newInstance() {
+   return new HtmlFormatter();
+   }
+
+   private static final String TEMPORARY_PLACEHOLDER = 
"superRandomTemporaryPlaceholder";
+
+   private static String escapeCharacters(String value) {
+   return value
+   .replaceAll("%s", TEMPORARY_PLACEHOLDER)
--- End diff --

can you give me an example for where this is problematic? Does this occur 
if the _final formatted_ description contains `%`?


---


[GitHub] flink pull request #6312: [FLINK-9792] Added custom Description class for Co...

2018-07-17 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6312#discussion_r202969620
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/description/HtmlFormatter.java
 ---
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.configuration.description;
+
+/**
+ * Formatter that transforms {@link Description} into Html representation.
+ */
+public class HtmlFormatter extends Formatter {
+
+   @Override
+   protected void formatLink(StringBuilder state, String link, String 
description) {
+   state.append(String.format("%s", link, 
description));
+   }
+
+   @Override
+   protected void formatLineBreak(StringBuilder state) {
+   state.append("");
+   }
+
+   @Override
+   protected void formatText(StringBuilder state, String format, String[] 
elements) {
+   String escapedFormat = escapeCharacters(format);
+   state.append(String.format(escapedFormat, elements));
+   }
+
+   @Override
+   protected void formatList(StringBuilder state, String[] entries) {
+   state.append("");
+   for (String entry : entries) {
+   state.append(String.format("%s", entry));
+   }
+   state.append("");
+   }
+
+   @Override
+   protected Formatter newInstance() {
+   return new HtmlFormatter();
+   }
+
+   private static final String TEMPORARY_PLACEHOLDER = 
"superRandomTemporaryPlaceholder";
--- End diff --

this should be distinct from the placeholder in `Utils`.


---


[GitHub] flink pull request #6294: [FLINK-9013][docs] Document yarn.containers.vcores...

2018-07-17 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6294#discussion_r202968456
  
--- Diff: 
flink-core/src/main/java/org/apache/flink/configuration/description/TextElement.java
 ---
@@ -53,6 +55,16 @@ public static TextElement text(String text) {
return new TextElement(text, Collections.emptyList());
}
 
+   /**
+* Tries to format the text as code.
+*
+* @return text element with applied formatting
+*/
+   public TextElement formatAsCode() {
--- End diff --

alternatively we could add an explicit `Code` `InlineElement`.


---


[GitHub] flink pull request #6350: [FLINK-9873][runtime] Log task state when aborting...

2018-07-17 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6350#discussion_r202962522
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ---
@@ -457,13 +457,20 @@ public CheckpointTriggerResult triggerCheckpoint(
Execution[] executions = new Execution[tasksToTrigger.length];
for (int i = 0; i < tasksToTrigger.length; i++) {
Execution ee = 
tasksToTrigger[i].getCurrentExecutionAttempt();
-   if (ee != null && ee.getState() == 
ExecutionState.RUNNING) {
-   executions[i] = ee;
-   } else {
+   if (ee == null) {
--- End diff --

according to the `ExecutionVertex` docs this branch shouldn't be necessary 
at all, but i kept it in to be safe.


---


[GitHub] flink pull request #6350: [FLINK-9873][runtime] Log task state when aborting...

2018-07-17 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6350

[FLINK-9873][runtime] Log task state when aborting checkpoint

## What is the purpose of the change

This PR adjusts the logging message for when a checkpoint is declined due 
to tasks not being ready.
We now explicitly log the current task state.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9873

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6350.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6350


commit 4dfe622fc45a2233dbba58640d6aa67be4739f86
Author: zentol 
Date:   2018-07-17T07:34:45Z

[FLINK-9873][runtime] Log task state when aborting checkpoint




---


[GitHub] flink pull request #6349: [FLINK-6997][tests] Properly cancel test job

2018-07-17 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6349

[FLINK-6997][tests] Properly cancel test job

## What is the purpose of the change

With this PR the jobs started in 
`SavepointITCase#testSavepointForJobWithIteration` are properly canceled. 
Previously they remained in a running state until the cluster was shut down, 
causing several exceptions to be logged.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9872

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6349.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6349


commit 434d52a4efaa31da59b04ede010b6a7757ebbcbc
Author: zentol 
Date:   2018-07-17T09:34:29Z

[FLINK-6997][tests] Properly cancel test job




---


[GitHub] flink issue #6347: [hotfix] consistency: vertexes -> vertices

2018-07-17 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6347
  
`vertices` is the correct plural, but this is another one of those cases 
where fixing it might cause more harm than good since it could cause merge 
conflicts, yet provides no functional benefit.

Additionally this PR makes a lot of whitespace changes that should be 
reverted in any case.


---


[GitHub] flink pull request #6342: [FLINK-9748][release] Use dedicated directory for ...

2018-07-16 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6342

[FLINK-9748][release] Use dedicated directory for release artifacts

## What is the purpose of the change

With this PR artifacts created during the release process are no longer 
placed in the root flink directory, but instead a dedicated directory under 
`/tools/releasing`.
This makes it easier to reset the repository state in case of an error, as 
all you have to do is remove said directory. It also prevents accidentally 
committing release files.
In case of success this directory will contain all release artifacts that 
should be uploaded.

Additionally this PR introduces variables for commonly used directories 
(flink root directory, release directory, flink-clone directory) and reduces 
usages of relative paths.

## Brief change log

* modifies source/binary release scripts to use dedicate directory for 
storing release artifacts
* modified rat-plugin to exclude release directory
* modified .gitignore to exclude release directory

## Verifying this change

Manually verified.

@aljoscha @tillrohrmann I'd appreciate your input.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9748

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6342.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6342


commit 92bf5ce764dafa82fcdc2ad3c625d194979c76d9
Author: zentol 
Date:   2018-07-16T13:16:19Z

[FLINK-9748][release] Use dedicated directory for release artifacts




---


[GitHub] flink issue #6327: [FLINK-9839][e2e] add end-to-end tests with SSL enabled

2018-07-16 Thread zentol
Github user zentol commented on the issue:

https://github.com/apache/flink/pull/6327
  
merging.


---


[GitHub] flink pull request #6340: [FLINK-9842][rest] Pass actual configuration to Bl...

2018-07-16 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6340#discussion_r202655858
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/rest/handler/job/JobSubmitHandlerTest.java
 ---
@@ -57,24 +60,39 @@
 /**
  * Tests for the {@link JobSubmitHandler}.
  */
+@RunWith(Parameterized.class)
 public class JobSubmitHandlerTest extends TestLogger {
 
+   @Parameterized.Parameters(name = "SSL enabled: {0}")
+   public static Iterable data() {
+   return Arrays.asList(true, false);
+   }
+
@ClassRule
public static final TemporaryFolder TEMPORARY_FOLDER = new 
TemporaryFolder();
-   private static BlobServer blobServer;
 
-   @BeforeClass
-   public static void setup() throws IOException {
-   Configuration config = new Configuration();
+   private final Configuration sslConfig;
--- End diff --

true, I'll give it a more generic name while merging.


---


  1   2   3   4   5   6   7   8   9   10   >