[GitHub] flink pull request #6419: [FLINK-9949][tests] Kill Flink processes in DB/tea...

2018-07-25 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/6419

[FLINK-9949][tests] Kill Flink processes in DB/teardown

## What is the purpose of the change

*Not killing Flink processes at the end of a test, can cause interference 
with subsequent test runs.*

## Brief change log
  - *Kill Flink processes in `DB/teardown!`.*

## Verifying this change

This change added tests and can be verified as follows:

  - *Ran tests in docker.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

cc: @tillrohrmann 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-9949

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6419.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6419


commit db3cecb5c9fb16d707e02b436244ba8fd5ee1ce8
Author: gyao 
Date:   2018-07-25T13:28:40Z

[FLINK-9949][tests] Kill Flink processes in DB/teardown




---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205110205
  
--- Diff: 
flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java
 ---
@@ -66,14 +61,21 @@ public static void main(String[] args) throws Exception 
{
ParameterTool params = ParameterTool.fromArgs(args);
int loadFactor = 
Integer.parseInt(params.getRequired("loadFactor"));
String outputPath = params.getRequired("outputPath");
+   boolean infinite = params.getBoolean("infinite", false);
 
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
int numKeys = loadFactor * 128 * 1024;
-   DataSet> x1Keys = env.createInput(new 
Generator(numKeys, 1)).setParallelism(4);
+   DataSet> x1Keys;
DataSet> x2Keys = env.createInput(new 
Generator(numKeys * 32, 2)).setParallelism(4);
DataSet> x8Keys = env.createInput(new 
Generator(numKeys, 8)).setParallelism(4);
 
+   if (infinite) {
+   x1Keys = 
env.createInput(Generator.infinite()).setParallelism(4).filter(t -> t.f1 >= 0);
--- End diff --

Yes, I didn't want to pass elements from infinite source down the graph, so 
not to reach some OOMs because of the infinite nature of it. That's why I am 
filtering out those elements.

Do you think we should pass them anyway?


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205106925
  
--- Diff: 
flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java
 ---
@@ -66,14 +61,21 @@ public static void main(String[] args) throws Exception 
{
ParameterTool params = ParameterTool.fromArgs(args);
int loadFactor = 
Integer.parseInt(params.getRequired("loadFactor"));
String outputPath = params.getRequired("outputPath");
+   boolean infinite = params.getBoolean("infinite", false);
 
ExecutionEnvironment env = 
ExecutionEnvironment.getExecutionEnvironment();
 
int numKeys = loadFactor * 128 * 1024;
-   DataSet> x1Keys = env.createInput(new 
Generator(numKeys, 1)).setParallelism(4);
+   DataSet> x1Keys;
DataSet> x2Keys = env.createInput(new 
Generator(numKeys * 32, 2)).setParallelism(4);
DataSet> x8Keys = env.createInput(new 
Generator(numKeys, 8)).setParallelism(4);
 
+   if (infinite) {
+   x1Keys = 
env.createInput(Generator.infinite()).setParallelism(4).filter(t -> t.f1 >= 0);
--- End diff --

This now filters out all data.


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205108213
  
--- Diff: 
flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/Generator.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.tests;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * InputFormat that generates a deterministic DataSet of Tuple2(String, 
Integer)
+ * 
+ * String: key, can be repeated.
+ * Integer: uniformly distributed int between 0 and 127
+ * 
+ *
+ * If control path was provided, as long as this file is empty dummy 
elements with value equal to -1 will be emitted.
--- End diff --

outdated


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205108561
  
--- Diff: 
flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/Generator.java
 ---
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.tests;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * InputFormat that generates a deterministic DataSet of Tuple2(String, 
Integer)
+ * 
+ * String: key, can be repeated.
+ * Integer: uniformly distributed int between 0 and 127
+ * 
+ *
+ * If control path was provided, as long as this file is empty dummy 
elements with value equal to -1 will be emitted.
+ */
+public class Generator implements InputFormat, 
GenericInputSplit> {
+
+   // total number of records
+   private final long numRecords;
+   // total number of keys
+   private final long numKeys;
+
+   // records emitted per partition
+   private long recordsPerPartition;
+   // number of keys per partition
+   private long keysPerPartition;
+
+   // number of currently emitted records
+   private long recordCnt;
+
+   // id of current partition
+   private int partitionId;
+
+   private final boolean infinite;
+
+   public Generator(long numKeys, int recordsPerKey) {
+   this(numKeys, recordsPerKey, false);
+   }
+
+   private Generator(long numKeys, int recordsPerKey, boolean infinite) {
+   this.numKeys = numKeys;
+   this.numRecords = numKeys * recordsPerKey;
+   this.infinite = infinite;
+   }
+
+   public static Generator infinite() {
+   return new Generator(Long.MAX_VALUE, 1, true);
+   }
+
+   @Override
+   public void configure(Configuration parameters) { }
+
+   @Override
+   public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
+   return null;
+   }
+
+   @Override
+   public GenericInputSplit[] createInputSplits(int minNumSplits) {
+
+   GenericInputSplit[] splits = new 
GenericInputSplit[minNumSplits];
+   for (int i = 0; i < minNumSplits; i++) {
+   splits[i] = new GenericInputSplit(i, minNumSplits);
+   }
+   return splits;
+   }
+
+   @Override
+   public InputSplitAssigner getInputSplitAssigner(GenericInputSplit[] 
inputSplits) {
+   return new DefaultInputSplitAssigner(inputSplits);
+   }
+
+   @Override
+   public void open(GenericInputSplit split) throws IOException {
+   this.partitionId = split.getSplitNumber();
+   // total number of partitions
+   int numPartitions = split.getTotalNumberOfSplits();
+
+   // ensure even distribution of records and keys
+   Preconditions.checkArgument(
+   numRecords % numPartitions == 0,
+   "Records cannot be evenly distributed among 
partitions");
+   Preconditions.checkArgument(
+   numKeys % numPartitions == 0,
+   "Keys cannot be evenly distributed among partitions");
+
+   this.recordsPerPartition = numRecords / numPartitions;
+   this.keysPerPartition = numKeys / numPartitions;
+
+   this.recordCnt = 0;
+   }
+
+   @Override
+   public boolean reachedEnd() {
+   return this.recordCnt >= this.recordsPerP

[GitHub] flink pull request #6396: [FLINK-9806][docs] Add canonical link element to d...

2018-07-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6396


---


[GitHub] flink pull request #6407: [FLINK-9877][docs] Add documentation page for diff...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6407#discussion_r205097411
  
--- Diff: docs/dev/stream/operators/joining.md ---
@@ -0,0 +1,286 @@
+---
+title: "Joining"
+nav-id: streaming_joins
+nav-show_overview: true
+nav-parent_id: streaming
+nav-pos: 10
+---
+
+
+* toc
+{:toc}
+
+# Window Join
+A window join will join the elements of two streams that share a common 
key and lie in the same window. These windows can be defined by using a [window 
assigner]({{ site.baseurl}}/dev/stream/operators/windows.html#window-assigners) 
and are evaluated on a union of both streams. This is especially important for 
session window joins, which will be demonstrated below.
+
+The joined elements are then passed to a user-defined `JoinFunction` or 
`FlatJoinFunction` where the user can perform transformations on the joined 
elements.
+
+The general usage always looks like the followning:
+
+```java
+stream.join(otherStream)
+.where()
+.equalTo()
+.window()
+.apply()
+```
+
+Some notes on semantics:
+- The creation of pairwise combinations of elements of the two streams 
behaves like an inner-join, meaning elements from one stream will not be 
emitted if they don't have a corresponding element from the other stream to be 
joined with.
+- Those elements that do get joined will have as their timestamp the 
largest timestamp that still lies in the respective window. For example a 
window with `[5, 10)` as its boundaries would result in the joined elements 
having nine as their timestamp.
+
+In the following section we are going to give an overview over how 
different kinds of windows can be used for a window join and what the results 
of those joins would look like using examplary scenarios.
+
+## Tumbling Window
+When performing a tumbling window join, all elements with a common key and 
a common tumbling window are joined as pairwise combinations and passed on to 
the user-defined function. Because this behaves like an inner join, elements of 
one stream that do not have elements from another stream in their tumbling 
window are not emitted!
+
+### Example
+
+
+In our example we are defining a tumbling window with the size of 2 
milliseconds, which results in windows of the form `[0,1], [2,3], ...`. The 
image shows the pairwise combinations of all elements in each window which will 
be passed on to the user-defined function. You can also see how in the tumbling 
window `[6,7]` nothing is emitted because no elements from the green stream 
exist to be joined with the orange elements ⑥ and ⑦.
+
+
+
+
+```java
--- End diff --

please use { % highlight java % } syntax


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205087147
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+if [ ${CLEARED} -eq 0 ]; then
+
+if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+kill ${JM_WATCHDOG_PID} 2> /dev/null
+wait ${JM_WATCHDOG_PID} 2> /dev/null
+fi
+
+CLEARED=1
+fi
+}
+
+function verify_logs() {
+local OUTPUT=$FLINK_DIR/log/*.out
+local JM_FAILURES=$1
+local EXIT_CODE=0
+
+# verify that we have no alerts
+if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the general purpose DataSet job."
+EXIT_CODE=1
+fi
+
+# checks that all apart from the first JM recover the failed jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' 'Recovered 
SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${JM_FAILURES} ]; then
+echo "FAILURE: A JM did not take over."
+EXIT_CODE=1
+fi
+
+if [[ $EXIT_CODE != 0 ]]; then
+echo "One or more tests FAILED."
+exit $EXIT_CODE
+fi
+}
+
+function jm_watchdog() {
+local EXPECTED_JMS=$1
+local IP_PORT=$2
+
+while true; do
+local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' 
| wc -l`;
+local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS))
+for (( c=0; c> ${TEST_DATA_DIR}/control/test.txt
--- End diff --

An infinite flag should be sufficient.


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205084257
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+if [ ${CLEARED} -eq 0 ]; then
+
+if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+kill ${JM_WATCHDOG_PID} 2> /dev/null
+wait ${JM_WATCHDOG_PID} 2> /dev/null
+fi
+
+CLEARED=1
+fi
+}
+
+function verify_logs() {
+local OUTPUT=$FLINK_DIR/log/*.out
+local JM_FAILURES=$1
+local EXIT_CODE=0
+
+# verify that we have no alerts
+if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the general purpose DataSet job."
+EXIT_CODE=1
+fi
+
+# checks that all apart from the first JM recover the failed jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' 'Recovered 
SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${JM_FAILURES} ]; then
+echo "FAILURE: A JM did not take over."
+EXIT_CODE=1
+fi
+
+if [[ $EXIT_CODE != 0 ]]; then
+echo "One or more tests FAILED."
+exit $EXIT_CODE
+fi
+}
+
+function jm_watchdog() {
+local EXPECTED_JMS=$1
+local IP_PORT=$2
+
+while true; do
+local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' 
| wc -l`;
+local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS))
+for (( c=0; c

[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205084180
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+if [ ${CLEARED} -eq 0 ]; then
+
+if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+kill ${JM_WATCHDOG_PID} 2> /dev/null
+wait ${JM_WATCHDOG_PID} 2> /dev/null
+fi
+
+CLEARED=1
+fi
+}
+
+function verify_logs() {
+local OUTPUT=$FLINK_DIR/log/*.out
+local JM_FAILURES=$1
+local EXIT_CODE=0
+
+# verify that we have no alerts
+if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the general purpose DataSet job."
+EXIT_CODE=1
+fi
+
+# checks that all apart from the first JM recover the failed jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' 'Recovered 
SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${JM_FAILURES} ]; then
+echo "FAILURE: A JM did not take over."
+EXIT_CODE=1
+fi
+
+if [[ $EXIT_CODE != 0 ]]; then
+echo "One or more tests FAILED."
+exit $EXIT_CODE
+fi
+}
+
+function jm_watchdog() {
+local EXPECTED_JMS=$1
+local IP_PORT=$2
+
+while true; do
+local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' 
| wc -l`;
+local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS))
+for (( c=0; c

[GitHub] flink pull request #6418: [FLINK-9939][runtime] Mesos: Not setting TMP dirs ...

2018-07-25 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/6418

[FLINK-9939][runtime] Mesos: Not setting TMP dirs causes NPE

## What is the purpose of the change

*This fixes a possible NPE when deploying on Mesos.*

## Brief change log

  - *Add null check to 
`BootstrapTools.updateTmpDirectoriesInConfiguration(...)`, and add `@Nullable` 
annotation.*

## Verifying this change

This change added tests and can be verified as follows:
  - *Added unit test to assert that `updateTmpDirectoriesInConfiguration` 
can handle `null` values.*
  - *Manually deployed on Mesos.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-9939

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6418.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6418


commit eb03ada77db7633d330be7847dbdf9ee801a9bee
Author: gyao 
Date:   2018-07-25T10:23:20Z

[hotfix][tests] Fix checkstyle violations in BootstrapToolsTest.

commit 261d4d7423b9ca179ac0004625f51b7b71655d63
Author: gyao 
Date:   2018-07-25T11:57:34Z

[FLINK-9939][runtime] Add null check before setting tmp dirs in config.




---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205082457
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+if [ ${CLEARED} -eq 0 ]; then
+
+if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+kill ${JM_WATCHDOG_PID} 2> /dev/null
+wait ${JM_WATCHDOG_PID} 2> /dev/null
+fi
+
+CLEARED=1
+fi
+}
+
+function verify_logs() {
+local OUTPUT=$FLINK_DIR/log/*.out
+local JM_FAILURES=$1
+local EXIT_CODE=0
+
+# verify that we have no alerts
+if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the general purpose DataSet job."
+EXIT_CODE=1
+fi
+
+# checks that all apart from the first JM recover the failed jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' 'Recovered 
SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${JM_FAILURES} ]; then
+echo "FAILURE: A JM did not take over."
+EXIT_CODE=1
+fi
+
+if [[ $EXIT_CODE != 0 ]]; then
+echo "One or more tests FAILED."
+exit $EXIT_CODE
+fi
+}
+
+function jm_watchdog() {
+local EXPECTED_JMS=$1
+local IP_PORT=$2
+
+while true; do
+local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' 
| wc -l`;
+local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS))
+for (( c=0; c> ${TEST_DATA_DIR}/control/test.txt
--- End diff --

You are right. Do you think we need the changes in the `Generator` source 
or shall I maybe just introduce a flag to make the source infinite?


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205081350
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+if [ ${CLEARED} -eq 0 ]; then
+
+if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+kill ${JM_WATCHDOG_PID} 2> /dev/null
+wait ${JM_WATCHDOG_PID} 2> /dev/null
+fi
+
+CLEARED=1
+fi
+}
+
+function verify_logs() {
+local OUTPUT=$FLINK_DIR/log/*.out
+local JM_FAILURES=$1
+local EXIT_CODE=0
+
+# verify that we have no alerts
+if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the general purpose DataSet job."
+EXIT_CODE=1
+fi
+
+# checks that all apart from the first JM recover the failed jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' 'Recovered 
SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${JM_FAILURES} ]; then
+echo "FAILURE: A JM did not take over."
+EXIT_CODE=1
+fi
+
+if [[ $EXIT_CODE != 0 ]]; then
+echo "One or more tests FAILED."
+exit $EXIT_CODE
+fi
+}
+
+function jm_watchdog() {
+local EXPECTED_JMS=$1
+local IP_PORT=$2
+
+while true; do
+local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' 
| wc -l`;
+local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS))
+for (( c=0; c> ${TEST_DATA_DIR}/control/test.txt
--- End diff --

But the loop executes `wait_job_running ${JOB_ID}`, so don't we know it 
_actually_ restarted?


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205080760
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+if [ ${CLEARED} -eq 0 ]; then
+
+if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+kill ${JM_WATCHDOG_PID} 2> /dev/null
+wait ${JM_WATCHDOG_PID} 2> /dev/null
+fi
+
+CLEARED=1
+fi
+}
+
+function verify_logs() {
+local OUTPUT=$FLINK_DIR/log/*.out
+local JM_FAILURES=$1
+local EXIT_CODE=0
+
+# verify that we have no alerts
+if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the general purpose DataSet job."
+EXIT_CODE=1
+fi
+
+# checks that all apart from the first JM recover the failed jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' 'Recovered 
SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${JM_FAILURES} ]; then
+echo "FAILURE: A JM did not take over."
+EXIT_CODE=1
+fi
+
+if [[ $EXIT_CODE != 0 ]]; then
+echo "One or more tests FAILED."
+exit $EXIT_CODE
+fi
+}
+
+function jm_watchdog() {
+local EXPECTED_JMS=$1
+local IP_PORT=$2
+
+while true; do
+local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' 
| wc -l`;
+local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS))
+for (( c=0; c> ${TEST_DATA_DIR}/control/test.txt
--- End diff --

We've verified that it started being recovered. We check for string 
"Recovered SubmittedJobGraph" It might happen that it won't be able to run, 
e.g. it might not be able to allocate resources.

I think with cancel we would not cancel e.g. this one: 
https://issues.apache.org/jira/browse/FLINK-9635. Am I correct?


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205078058
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+if [ ${CLEARED} -eq 0 ]; then
+
+if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+kill ${JM_WATCHDOG_PID} 2> /dev/null
+wait ${JM_WATCHDOG_PID} 2> /dev/null
+fi
+
+CLEARED=1
+fi
+}
+
+function verify_logs() {
+local OUTPUT=$FLINK_DIR/log/*.out
+local JM_FAILURES=$1
+local EXIT_CODE=0
+
+# verify that we have no alerts
+if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the general purpose DataSet job."
+EXIT_CODE=1
+fi
+
+# checks that all apart from the first JM recover the failed jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' 'Recovered 
SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${JM_FAILURES} ]; then
+echo "FAILURE: A JM did not take over."
+EXIT_CODE=1
+fi
+
+if [[ $EXIT_CODE != 0 ]]; then
+echo "One or more tests FAILED."
+exit $EXIT_CODE
+fi
+}
+
+function jm_watchdog() {
+local EXPECTED_JMS=$1
+local IP_PORT=$2
+
+while true; do
+local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' 
| wc -l`;
+local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS))
+for (( c=0; c> ${TEST_DATA_DIR}/control/test.txt
--- End diff --

We've already verified that the job is properly restarted in the 
jobmanager-kill loop, so this should only be about shutting down the job.


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205076492
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+if [ ${CLEARED} -eq 0 ]; then
+
+if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+kill ${JM_WATCHDOG_PID} 2> /dev/null
+wait ${JM_WATCHDOG_PID} 2> /dev/null
+fi
+
+CLEARED=1
+fi
+}
+
+function verify_logs() {
+local OUTPUT=$FLINK_DIR/log/*.out
+local JM_FAILURES=$1
+local EXIT_CODE=0
+
+# verify that we have no alerts
+if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the general purpose DataSet job."
+EXIT_CODE=1
+fi
+
+# checks that all apart from the first JM recover the failed jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' 'Recovered 
SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${JM_FAILURES} ]; then
+echo "FAILURE: A JM did not take over."
+EXIT_CODE=1
+fi
+
+if [[ $EXIT_CODE != 0 ]]; then
+echo "One or more tests FAILED."
+exit $EXIT_CODE
+fi
+}
+
+function jm_watchdog() {
+local EXPECTED_JMS=$1
+local IP_PORT=$2
+
+while true; do
+local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' 
| wc -l`;
+local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS))
+for (( c=0; c> ${TEST_DATA_DIR}/control/test.txt
--- End diff --

Probably we could just for testing the HA. This way though we ensure it 
completes succesfully after all the restarts.


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205069073
  
--- Diff: 
flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/DataSetAllroundTestProgram.java
 ---
@@ -66,14 +59,21 @@ public static void main(String[] args) throws Exception 
{
ParameterTool params = ParameterTool.fromArgs(args);
int loadFactor = 
Integer.parseInt(params.getRequired("loadFactor"));
String outputPath = params.getRequired("outputPath");
+   String source = params.get("source", null);
--- End diff --

source parameter is not documented in the javadocs


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205070681
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
--- End diff --

same as below


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205070467
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+if [ ${CLEARED} -eq 0 ]; then
+
+if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+kill ${JM_WATCHDOG_PID} 2> /dev/null
+wait ${JM_WATCHDOG_PID} 2> /dev/null
+fi
+
+CLEARED=1
+fi
+}
+
+function verify_logs() {
+local OUTPUT=$FLINK_DIR/log/*.out
+local JM_FAILURES=$1
+local EXIT_CODE=0
+
+# verify that we have no alerts
+if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the general purpose DataSet job."
+EXIT_CODE=1
+fi
+
+# checks that all apart from the first JM recover the failed jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' 'Recovered 
SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${JM_FAILURES} ]; then
+echo "FAILURE: A JM did not take over."
+EXIT_CODE=1
+fi
+
+if [[ $EXIT_CODE != 0 ]]; then
+echo "One or more tests FAILED."
+exit $EXIT_CODE
+fi
+}
+
+function jm_watchdog() {
+local EXPECTED_JMS=$1
+local IP_PORT=$2
+
+while true; do
+local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' 
| wc -l`;
+local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS))
+for (( c=0; c

[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205070430
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+if [ ${CLEARED} -eq 0 ]; then
+
+if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+kill ${JM_WATCHDOG_PID} 2> /dev/null
+wait ${JM_WATCHDOG_PID} 2> /dev/null
+fi
+
+CLEARED=1
+fi
+}
+
+function verify_logs() {
+local OUTPUT=$FLINK_DIR/log/*.out
+local JM_FAILURES=$1
+local EXIT_CODE=0
+
+# verify that we have no alerts
+if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the general purpose DataSet job."
+EXIT_CODE=1
+fi
+
+# checks that all apart from the first JM recover the failed jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' 'Recovered 
SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${JM_FAILURES} ]; then
+echo "FAILURE: A JM did not take over."
+EXIT_CODE=1
+fi
+
+if [[ $EXIT_CODE != 0 ]]; then
+echo "One or more tests FAILED."
+exit $EXIT_CODE
+fi
+}
+
+function jm_watchdog() {
--- End diff --

move into shared `common-ha.sh`


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205072007
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+if [ ${CLEARED} -eq 0 ]; then
+
+if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+kill ${JM_WATCHDOG_PID} 2> /dev/null
+wait ${JM_WATCHDOG_PID} 2> /dev/null
+fi
+
+CLEARED=1
+fi
+}
+
+function verify_logs() {
+local OUTPUT=$FLINK_DIR/log/*.out
+local JM_FAILURES=$1
+local EXIT_CODE=0
+
+# verify that we have no alerts
+if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the general purpose DataSet job."
+EXIT_CODE=1
+fi
+
+# checks that all apart from the first JM recover the failed jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' 'Recovered 
SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${JM_FAILURES} ]; then
+echo "FAILURE: A JM did not take over."
+EXIT_CODE=1
+fi
+
+if [[ $EXIT_CODE != 0 ]]; then
+echo "One or more tests FAILED."
+exit $EXIT_CODE
+fi
+}
+
+function jm_watchdog() {
+local EXPECTED_JMS=$1
+local IP_PORT=$2
+
+while true; do
+local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' 
| wc -l`;
+local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS))
+for (( c=0; c> ${TEST_DATA_DIR}/control/test.txt
--- End diff --

Couldn't we simply cancel the job?


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205070024
  
--- Diff: 
flink-end-to-end-tests/flink-dataset-allround-test/src/main/java/org/apache/flink/batch/tests/Generator.java
 ---
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.batch.tests;
+
+import org.apache.flink.api.common.io.DefaultInputSplitAssigner;
+import org.apache.flink.api.common.io.InputFormat;
+import org.apache.flink.api.common.io.statistics.BaseStatistics;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.io.GenericInputSplit;
+import org.apache.flink.core.io.InputSplitAssigner;
+import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.Preconditions;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * InputFormat that generates a deterministic DataSet of Tuple2(String, 
Integer)
+ * 
+ * String: key, can be repeated.
+ * Integer: uniformly distributed int between 0 and 127
+ * 
+ *
+ * If control path was provided, as long as this file is empty dummy 
elements with value equal to -1 will be emitted.
+ */
+public class Generator implements InputFormat, 
GenericInputSplit> {
+
+   // total number of records
+   private final long numRecords;
+   // total number of keys
+   private final long numKeys;
+
+   // records emitted per partition
+   private long recordsPerPartition;
+   // number of keys per partition
+   private long keysPerPartition;
+
+   // number of currently emitted records
+   private long recordCnt;
+
+   // id of current partition
+   private int partitionId;
+
+   private final String path;
+
+   private boolean hasStarted = true;
+
+   public Generator(long numKeys, int recordsPerKey) {
+   this(numKeys, recordsPerKey, null);
+   }
+
+   public Generator(long numKeys, int recordsPerKey, String controlPath) {
+   this.numKeys = numKeys;
+   this.numRecords = numKeys * recordsPerKey;
+   this.path = controlPath;
+   }
+
+   @Override
+   public void configure(Configuration parameters) { }
+
+   @Override
+   public BaseStatistics getStatistics(BaseStatistics cachedStatistics) {
+   return null;
+   }
+
+   @Override
+   public GenericInputSplit[] createInputSplits(int minNumSplits) {
+
+   GenericInputSplit[] splits = new 
GenericInputSplit[minNumSplits];
+   for (int i = 0; i < minNumSplits; i++) {
+   splits[i] = new GenericInputSplit(i, minNumSplits);
+   }
+   return splits;
+   }
+
+   @Override
+   public InputSplitAssigner getInputSplitAssigner(GenericInputSplit[] 
inputSplits) {
+   return new DefaultInputSplitAssigner(inputSplits);
+   }
+
+   @Override
+   public void open(GenericInputSplit split) throws IOException {
+   this.partitionId = split.getSplitNumber();
+   // total number of partitions
+   int numPartitions = split.getTotalNumberOfSplits();
+
+   // ensure even distribution of records and keys
+   Preconditions.checkArgument(
+   numRecords % numPartitions == 0,
+   "Records cannot be evenly distributed among 
partitions");
+   Preconditions.checkArgument(
+   numKeys % numPartitions == 0,
+   "Keys cannot be evenly distributed among partitions");
+
+   this.recordsPerPartition = numRecords / numPartitions;
+   this.keysPerPartition = numKeys / numPartitions;
+
+   this.recordCnt = 0;
+
+   if (this.path != null) {
+   this.hasStarted = false;
+   }
+   }
+
+   @

[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205071341
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+if [ ${CLEARED} -eq 0 ]; then
+
+if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+kill ${JM_WATCHDOG_PID} 2> /dev/null
+wait ${JM_WATCHDOG_PID} 2> /dev/null
+fi
+
+CLEARED=1
+fi
+}
+
+function verify_logs() {
+local OUTPUT=$FLINK_DIR/log/*.out
+local JM_FAILURES=$1
+local EXIT_CODE=0
+
+# verify that we have no alerts
+if ! [ `cat ${OUTPUT} | wc -l` -eq 0 ]; then
+echo "FAILURE: Alerts found at the general purpose DataSet job."
+EXIT_CODE=1
+fi
+
+# checks that all apart from the first JM recover the failed jobgraph.
+if ! [ `grep -r --include '*standalonesession*.log' 'Recovered 
SubmittedJobGraph' "${FLINK_DIR}/log/" | cut -d ":" -f 1 | uniq | wc -l` -eq 
${JM_FAILURES} ]; then
+echo "FAILURE: A JM did not take over."
+EXIT_CODE=1
+fi
+
+if [[ $EXIT_CODE != 0 ]]; then
+echo "One or more tests FAILED."
+exit $EXIT_CODE
+fi
+}
+
+function jm_watchdog() {
+local EXPECTED_JMS=$1
+local IP_PORT=$2
+
+while true; do
+local RUNNING_JMS=`jps | grep 'StandaloneSessionClusterEntrypoint' 
| wc -l`;
+local MISSING_JMS=$((EXPECTED_JMS-RUNNING_JMS))
+for (( c=0; c

[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6415#discussion_r205070634
  
--- Diff: flink-end-to-end-tests/test-scripts/test_ha_dataset.sh ---
@@ -0,0 +1,139 @@
+#!/usr/bin/env bash

+
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+source "$(dirname "$0")"/common.sh
+

+TEST_PROGRAM_JAR=${END_TO_END_DIR}/flink-dataset-allround-test/target/DataSetAllroundTestProgram.jar
+
+JM_WATCHDOG_PID=0
+
+# flag indicating if we have already cleared up things after a test
+CLEARED=0
+
+function stop_cluster_and_watchdog() {
+if [ ${CLEARED} -eq 0 ]; then
+
+if ! [ ${JM_WATCHDOG_PID} -eq 0 ]; then
+echo "Killing JM watchdog @ ${JM_WATCHDOG_PID}"
+kill ${JM_WATCHDOG_PID} 2> /dev/null
+wait ${JM_WATCHDOG_PID} 2> /dev/null
+fi
+
+CLEARED=1
+fi
+}
+
+function verify_logs() {
--- End diff --

move into shared `common-ha.sh`, parameterized the strings to search for so 
it's also applicable for the streaming test.


---


[GitHub] flink pull request #6417: [FLINK-9913][runtime] Improve output serialization...

2018-07-25 Thread zhijiangW
GitHub user zhijiangW opened a pull request:

https://github.com/apache/flink/pull/6417

[FLINK-9913][runtime] Improve output serialization only once in RecordWriter

## What is the purpose of the change

*This pull request improves the output serialization only once for multi 
target channels in `RecordWriter`, rather than serialization as many times as 
the number of selected channels.

## Brief change log

  - *Only one `RecordSerializer` is created for all the output channels in 
`RecordWriter`*
  - *Restructure the processes of `emit`, `broadcastEmit`, randomEmit` in 
`RecordWriter`*
  - *Restructure the interface methods in `RecordSerializer`*

## Verifying this change

This change is already covered by existing tests, such as 
*SpanningRecordSerializationTest*, etc.

And adds new tests in `RecordWriterTest` to verify:

  - *The serialization results are correct by `RecordWriter#emit` with 
`BroadcastPartitioner`*
  - *The serialization results are correct by `RecordWriter#broadcastEmit` 
directly*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (yes)
  - The runtime per-record code paths (performance sensitive): (yes)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (no)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zhijiangW/flink FLINK-9913

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6417.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6417


commit 109ddb37abafcea28478b90cda10b965e0c399d5
Author: Zhijiang 
Date:   2018-07-25T05:45:23Z

[FLINK-9913][runtime] Improve output serialization only once in RecordWriter




---


[GitHub] flink pull request #6416: [FLINK-9942][rest] Guard handlers against null fie...

2018-07-25 Thread zentol
GitHub user zentol opened a pull request:

https://github.com/apache/flink/pull/6416

[FLINK-9942][rest] Guard handlers against null fields

## What is the purpose of the change

This PR fixes prevents some NPEs that could arise if fields in the request 
are set to null or are omitted.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/zentol/flink 9942

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6416.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6416


commit 3e9046864d566ef6aa0d9af7a09771ebb5fe556b
Author: zentol 
Date:   2018-07-25T06:39:29Z

[FLINK-9942][rest] Guard handlers against null fields




---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-25 Thread cjolif
Github user cjolif commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r205051308
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/Elasticsearch6ApiCallBridge.java
 ---
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchApiCallBridge;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import org.apache.flink.util.Preconditions;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.bulk.BackoffPolicy;
+import org.elasticsearch.action.bulk.BulkItemResponse;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestClient;
+import org.elasticsearch.client.RestHighLevelClient;
+import org.elasticsearch.common.unit.TimeValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Implementation of {@link ElasticsearchApiCallBridge} for Elasticsearch 
6 and later versions.
+ */
+public class Elasticsearch6ApiCallBridge implements 
ElasticsearchApiCallBridge {
+
+   private static final long serialVersionUID = -5222683870097809633L;
+
+   private static final Logger LOG = 
LoggerFactory.getLogger(Elasticsearch6ApiCallBridge.class);
+
+   /**
+* User-provided HTTP Host.
+*/
+   private final List httpHosts;
+
+   Elasticsearch6ApiCallBridge(List httpHosts) {
+   Preconditions.checkArgument(httpHosts != null && 
!httpHosts.isEmpty());
+   this.httpHosts = httpHosts;
+   }
+
+   @Override
+   public RestHighLevelClient createClient(Map 
clientConfig) {
+   RestHighLevelClient rhlClient =
--- End diff --

might have been good to support:
- context path / path prefix in addition to host
- login/password for Elasticsearch instances protected

that's ok to not do it as soon as the user can by subclassing. Maybe to 
make it easier to subclass there should be two methods.

keep the public `createClient` one that returns the `RHLClient`. And add a 
protected method `createRestClientBuilder` which return the 
`RestClientBuilder`. This way one can just redefine the protected method and 
let the pubic one handle the actual `RHLClient` instanciation from the 
`RestClientBuilder` created by the protected method. 


---


[GitHub] flink pull request #6414: [hotfix] Enrich exception message

2018-07-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6414


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r205037835
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
 ---
@@ -0,0 +1,27 @@

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - 
%m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, 
testlogger
--- End diff --

Removed.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r205037795
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import 
org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it 
you must ensure that
+ * you have a cluster named "elasticsearch" running or change the name of 
cluster in the config map.
+ */
+public class ElasticsearchSinkExample {
--- End diff --

Actually, we decided to move examples out of the test code. Removing this.


---


[GitHub] flink pull request #6388: [FLINK-6222] Allow passing env variables to start ...

2018-07-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6388


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r205031956
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.InternalSettingsPreparer;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.transport.Netty4Plugin;
+
+import java.io.File;
+import java.util.Collections;
+
+/**
+ * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for 
Elasticsearch 6.
+ * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for 
integration tests.
+ */
+public class EmbeddedElasticsearchNodeEnvironmentImpl implements 
EmbeddedElasticsearchNodeEnvironment {
+
+   private Node node;
+
+   @Override
+   public void start(File tmpDataFolder, String clusterName) throws 
Exception {
+   if (node == null) {
+   Settings settings = Settings.builder()
+   .put("cluster.name", clusterName)
+   .put("http.enabled", false)
+   .put("path.home", tmpDataFolder.getParent())
+   .put("path.data", 
tmpDataFolder.getAbsolutePath())
+   .put(NetworkModule.HTTP_TYPE_KEY, 
Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
--- End diff --

You're right, removed.


---


[GitHub] flink pull request #6372: [Flink-9353] Tests running per job standalone clus...

2018-07-25 Thread dawidwys
Github user dawidwys commented on a diff in the pull request:

https://github.com/apache/flink/pull/6372#discussion_r205030130
  
--- Diff: flink-end-to-end-tests/README.md ---
@@ -31,6 +31,12 @@ You can also run tests individually via
 $ FLINK_DIR= flink-end-to-end-tests/run-single-test.sh 
your_test.sh arg1 arg2
 ```
 
+### Kubernetes test
+
+Kubernetes test (test_kubernetes_embedded_job.sh) assumes a running 
minikube cluster. Right now we cannot
--- End diff --

Actually it assumes Minikube not Kubernetes. The problem is that we use one 
specific minikube function to populate the docker images, that we build to the 
minikube docker installation. To run it on any Kubernetes cluster we would need 
to upload the images to docker repository and point the cluster to use it.

The specific minikube method is: `eval $(minikube docker-env)`


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-25 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r205029480
  
--- Diff: flink-end-to-end-tests/run-nightly-tests.sh ---
@@ -96,6 +96,7 @@ run_test "Local recovery and sticky scheduling end-to-end 
test" "$END_TO_END_DIR
 run_test "Elasticsearch (v1.7.1) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 1 
https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz";
 run_test "Elasticsearch (v2.3.5) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 2 
https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz";
 run_test "Elasticsearch (v5.1.2) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 5 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz";
+run_test "Elasticsearch (v6.3.1) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 6 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz";
--- End diff --

Added a loop to wait until the Elasticsearch node is really running.


---


[GitHub] flink pull request #6415: [FLINK-8974] Run all-round DataSet job with failur...

2018-07-25 Thread dawidwys
GitHub user dawidwys opened a pull request:

https://github.com/apache/flink/pull/6415

[FLINK-8974] Run all-round DataSet job with failures in HA mode

Added all-round DataSet end-to-end test that runs in HA mode. It verifies 
the job is restarted correctly after job manager failure.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/dawidwys/flink FLINK-8974

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6415.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6415


commit 4a9a58505ed931c445450c210e2436c910f534b8
Author: Dawid Wysakowicz 
Date:   2018-07-25T08:45:09Z

[FLINK-8974] Run all-round DataSet job with failures in HA mode




---


[GitHub] flink pull request #6392: [FLINK-9694][table] Fix NPE in CRowSerializerConfi...

2018-07-25 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6392


---


[GitHub] flink pull request #6414: [hotfix] Enrich exception message

2018-07-25 Thread TisonKun
GitHub user TisonKun opened a pull request:

https://github.com/apache/flink/pull/6414

[hotfix] Enrich exception message

## What is the purpose of the change

There is a time I got fail on ExecutionGraphTestUtils#waitUntilJobStatus, 
it throws `TimeoutException` without detailed message. I'd like to throw with 
the status excepted and what it actually is.

## Brief change log

Trivial work.


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/TisonKun/flink patch-1

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6414.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6414


commit df17b34fd94cb89fa8073508ea920991c2f17134
Author: 陈梓立 
Date:   2018-07-25T08:11:06Z

[hotfix] Enrich exception message




---


[GitHub] flink pull request #6372: [Flink 9353] Tests running per job standalone clus...

2018-07-25 Thread aljoscha
Github user aljoscha commented on a diff in the pull request:

https://github.com/apache/flink/pull/6372#discussion_r205014402
  
--- Diff: flink-end-to-end-tests/README.md ---
@@ -31,6 +31,12 @@ You can also run tests individually via
 $ FLINK_DIR= flink-end-to-end-tests/run-single-test.sh 
your_test.sh arg1 arg2
 ```
 
+### Kubernetes test
+
+Kubernetes test (test_kubernetes_embedded_job.sh) assumes a running 
minikube cluster. Right now we cannot
--- End diff --

This should be "assumes a running Kubernetes cluster". And maybe point 
towards minikube as one easy way for getting that locally.


---


[GitHub] flink pull request #6411: [FLINK-9941][ScalaAPI] Flush in ScalaCsvOutputForm...

2018-07-25 Thread Lemonjing
GitHub user Lemonjing reopened a pull request:

https://github.com/apache/flink/pull/6411

[FLINK-9941][ScalaAPI] Flush in ScalaCsvOutputFormat before closing, to 
ensure CI stability

## What is the purpose of the change
This pull request update scala api `ScalaCsvOutputFormat` to increase CI 
stability.

## Brief change log
Add flush method before close function in ScalaCsvOutputFormat for scala 
API.

## Verifying this change
Add `ScalaCsvOutputFormatTest` and test passed.

## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / **no**)
- The public API, i.e., is any changed class annotated with 
@Public(Evolving): (yes / **no**)
- The serializers: (yes / **no** / don't know)
- The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
- Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
- The S3 file system connector: (yes / **no** / don't know)

## Documentation
Does this pull request introduce a new feature? (yes / **no**)
If yes, how is the feature documented? (not applicable / docs / JavaDocs / 
**not documented**)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Lemonjing/flink csv-close-hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6411.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6411


commit 95a9b60b1ece7d248755d92868e682c4ee0fd334
Author: lemonjing <932191671@...>
Date:   2018-07-25T07:06:10Z

[FLINK-9941][ScalaAPI] flush in ScalaCsvOutputFormat before closing, to 
ensure CI stability




---


[GitHub] flink pull request #6413: [FLINK-8993] [tests] Let general purpose DataStrea...

2018-07-25 Thread tzulitai
GitHub user tzulitai opened a pull request:

https://github.com/apache/flink/pull/6413

[FLINK-8993] [tests] Let general purpose DataStream job uses KryoSerializer 
via type extraction

## What is the purpose of the change

The general purpose DataStream job previously only uses the 
`KryoSerializer` via a custom state serializer. This PR allows the job to  also 
use the `KryoSerializer` via Flink's type extraction.

## Brief change log

- Adapt the state builders to be able to  supply a state class, instead of 
a state type serializer.
- Let `DataStreamAllroundTestJob` specify state serializers via state 
classes instead of a direct custom serializer.

## Verifying this change

This is a extension to existing end-to-end tests 
(`test-streaming-savepoint.sh`).

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tzulitai/flink FLINK-8993

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6413.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6413


commit 428d5427227343479b6d63daf7fced8f1bf9a69c
Author: Tzu-Li (Gordon) Tai 
Date:   2018-07-25T06:58:46Z

[FLINK-8993] [tests] Let general purpose DataStream job uses KryoSerializer 
via type extraction




---


[GitHub] flink pull request #6411: [FLINK-9941][ScalaAPI] Flush in ScalaCsvOutputForm...

2018-07-25 Thread Lemonjing
Github user Lemonjing closed the pull request at:

https://github.com/apache/flink/pull/6411


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204756208
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
 ---
@@ -39,23 +38,25 @@
  * exactly one instance of the call bridge, and state cleanup is performed 
when the sink is closed.
  */
 @Internal
-public interface ElasticsearchApiCallBridge extends Serializable {
+public abstract class ElasticsearchApiCallBridge implements Serializable {
 
/**
-* Creates an Elasticsearch {@link Client}.
+* Creates an Elasticsearch client implementing {@link AutoCloseable}.
 *
 * @param clientConfig The configuration to use when constructing the 
client.
 * @return The created client.
 */
-   Client createClient(Map clientConfig);
+   public abstract AutoCloseable createClient(Map 
clientConfig);
+
+   public abstract BulkProcessor.Builder 
createBulkProcessorBuilder(AutoCloseable client, BulkProcessor.Listener 
listener);
--- End diff --

No docs here?


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204999073
  
--- Diff: flink-end-to-end-tests/run-nightly-tests.sh ---
@@ -96,6 +96,7 @@ run_test "Local recovery and sticky scheduling end-to-end 
test" "$END_TO_END_DIR
 run_test "Elasticsearch (v1.7.1) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 1 
https://download.elastic.co/elasticsearch/elasticsearch/elasticsearch-1.7.1.tar.gz";
 run_test "Elasticsearch (v2.3.5) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 2 
https://download.elastic.co/elasticsearch/release/org/elasticsearch/distribution/tar/elasticsearch/2.3.5/elasticsearch-2.3.5.tar.gz";
 run_test "Elasticsearch (v5.1.2) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 5 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-5.1.2.tar.gz";
+run_test "Elasticsearch (v6.3.1) sink end-to-end test" 
"$END_TO_END_DIR/test-scripts/test_streaming_elasticsearch.sh 6 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz";
--- End diff --

The test is not runnable on my machine.

```
Elasticsearch node is not running.
grep: /Users/twalthr/flink/flink/build-target/log/*.out: No such file or 
directory

[FAIL] './test-scripts/test_streaming_elasticsearch.sh 6 
https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.3.1.tar.gz'
 failed after 0 minutes and 18 seconds! Test exited with exit code 1
```

The tests exits before elastic search has actually started. Also killing 
does not work. An Elasticsearch process is still running afterwards.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204991878
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch/EmbeddedElasticsearchNodeEnvironmentImpl.java
 ---
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSinkITCase;
+
+import org.elasticsearch.client.Client;
+import org.elasticsearch.common.network.NetworkModule;
+import org.elasticsearch.common.settings.Settings;
+import org.elasticsearch.node.InternalSettingsPreparer;
+import org.elasticsearch.node.Node;
+import org.elasticsearch.plugins.Plugin;
+import org.elasticsearch.transport.Netty4Plugin;
+
+import java.io.File;
+import java.util.Collections;
+
+/**
+ * Implementation of {@link EmbeddedElasticsearchNodeEnvironment} for 
Elasticsearch 6.
+ * Will be dynamically loaded in {@link ElasticsearchSinkITCase} for 
integration tests.
+ */
+public class EmbeddedElasticsearchNodeEnvironmentImpl implements 
EmbeddedElasticsearchNodeEnvironment {
+
+   private Node node;
+
+   @Override
+   public void start(File tmpDataFolder, String clusterName) throws 
Exception {
+   if (node == null) {
+   Settings settings = Settings.builder()
+   .put("cluster.name", clusterName)
+   .put("http.enabled", false)
+   .put("path.home", tmpDataFolder.getParent())
+   .put("path.data", 
tmpDataFolder.getAbsolutePath())
+   .put(NetworkModule.HTTP_TYPE_KEY, 
Netty4Plugin.NETTY_HTTP_TRANSPORT_NAME)
--- End diff --

Are these values still valid? I thought we are not relying on Netty anymore 
with the rest client?


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204758828
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java
 ---
@@ -176,7 +175,7 @@ public void setDelayMillis(long delayMillis) {
private AtomicLong numPendingRequests = new AtomicLong(0);
 
/** Elasticsearch client created using the call bridge. */
-   private transient Client client;
+   private transient AutoCloseable client;
--- End diff --

Same here. Why not parameterize the class and be type save?


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204758434
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
 ---
@@ -64,13 +65,15 @@
 * @param builder the {@link BulkProcessor.Builder} to configure.
 * @param flushBackoffPolicy user-provided backoff retry settings 
({@code null} if the user disabled backoff retries).
 */
-   void configureBulkProcessorBackoff(
+   public abstract void configureBulkProcessorBackoff(
BulkProcessor.Builder builder,
@Nullable ElasticsearchSinkBase.BulkFlushBackoffPolicy 
flushBackoffPolicy);
 
/**
 * Perform any necessary state cleanup.
 */
-   void cleanup();
+   public void cleanup() {
--- End diff --

Use Java 8 defaults and let this class stay an interface?


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204993234
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * IT cases for the {@link ElasticsearchSink}.
+ */
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+
+   /**
+* Tests that the Elasticsearch sink works properly using a {@link 
RestHighLevelClient}.
+*/
+   public void runTransportClientTest() throws Exception {
+   final String index = "transport-client-test-index";
+
+   final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+
+   DataStreamSource> source = 
env.addSource(new SourceSinkDataTestKit.TestDataSourceFunction());
+
+   Map userConfig = new HashMap<>();
+   // This instructs the sink to emit after every element, 
otherwise they would be buffered
+   
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+
+   
source.addSink(createElasticsearchSinkForEmbeddedNode(userConfig,
+   new 
SourceSinkDataTestKit.TestElasticsearchSinkFunction(index)));
+
+   env.execute("Elasticsearch RestHighLevelClient Test");
+
+   // verify the results
+   Client client = embeddedNodeEnv.getClient();
+   SourceSinkDataTestKit.verifyProducedSinkData(client, index);
+
+   client.close();
+   }
+
+   /**
+* Tests that the Elasticsearch sink fails eagerly if the provided list 
of transport addresses is {@code null}.
+*/
+   public void runNullTransportClientTest() throws Exception {
+   try {
+   Map userConfig = new HashMap<>();
+   
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+   createElasticsearchSink6(userConfig, null, new 
SourceSinkDataTestKit.TestElasticsearchSinkFunction("test"));
+   } catch (IllegalArgumentException expectedException) {
+   // test passes
+   return;
+   }
+
+   fail();
+   }
+
+   /**
+* Tests that the Elasticsearch sink fails eagerly if the provided list 
of transport addresses is empty.
+*/
+   public void runEmptyTransportClientTest() throws Exception {
+   try {
+   Map userConfig = new HashMap<>();
+   
userConfig.put(ElasticsearchSinkBase.CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS, "1");
+   createElasticsearchSink6(userConfig,
+   Collect

[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204994308
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/resources/log4j-test.properties
 ---
@@ -0,0 +1,27 @@

+
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#  http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.

+
+
+log4j.rootLogger=INFO, testlogger
+
+log4j.appender.testlogger=org.apache.log4j.ConsoleAppender
+log4j.appender.testlogger.target=System.err
+log4j.appender.testlogger.layout=org.apache.log4j.PatternLayout
+log4j.appender.testlogger.layout.ConversionPattern=%-4r [%t] %-5p %c %x - 
%m%n
+
+# suppress the irrelevant (wrong) warnings from the netty channel handler
+log4j.logger.org.jboss.netty.channel.DefaultChannelPipeline=ERROR, 
testlogger
--- End diff --

Still necessary?


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204990734
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch 6.x sink that requests multiple {@link ActionRequest 
ActionRequests}
+ * against a cluster for each incoming element.
+ *
+ * The sink internally uses a {@link RestHighLevelClient} to 
communicate with an Elasticsearch cluster.
+ * The sink will fail if no cluster can be connected to using the provided 
transport addresses passed to the constructor.
+ *
+ * The {@link Map} passed to the constructor is used to create the 
{@code TransportClient}. The config keys can be found
+ * in the https://www.elastic.io";>Elasticsearch 
documentation. An important setting is {@code cluster.name},
+ * which should be set to the name of the cluster that the sink should 
emit to.
+ *
+ * Internally, the sink will use a {@link BulkProcessor} to send {@link 
ActionRequest ActionRequests}.
+ * This will buffer elements before sending a request to the cluster. The 
behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * 
+ *{@code bulk.flush.max.actions}: Maximum amount of elements to 
buffer
+ *{@code bulk.flush.max.size.mb}: Maximum amount of data (in 
megabytes) to buffer
+ *{@code bulk.flush.interval.ms}: Interval at which to flush data 
regardless of the other two
+ *   settings in milliseconds
+ * 
+ *
+ * You also have to provide an {@link ElasticsearchSinkFunction}. This 
is used to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the 
class level documentation of
+ * {@link ElasticsearchSinkFunction} for an example.
+ *
+ * @param  Type of the elements handled by this sink
+ */
+public class ElasticsearchSink extends ElasticsearchSinkBase {
--- End diff --

Add `@PublicEvolving`


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204775927
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch 6.x sink that requests multiple {@link ActionRequest 
ActionRequests}
+ * against a cluster for each incoming element.
+ *
+ * The sink internally uses a {@link RestHighLevelClient} to 
communicate with an Elasticsearch cluster.
+ * The sink will fail if no cluster can be connected to using the provided 
transport addresses passed to the constructor.
+ *
+ * The {@link Map} passed to the constructor is used to create the 
{@code TransportClient}. The config keys can be found
--- End diff --

Update docs here.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204992515
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSinkITCase.java
 ---
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkTestBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.testutils.SourceSinkDataTestKit;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.client.Client;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.net.InetSocketAddress;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+/**
+ * IT cases for the {@link ElasticsearchSink}.
+ */
+public class ElasticsearchSinkITCase extends ElasticsearchSinkTestBase {
+
+   /**
+* Tests that the Elasticsearch sink works properly using a {@link 
RestHighLevelClient}.
+*/
+   public void runTransportClientTest() throws Exception {
--- End diff --

There are no `@Test` annotations to run tests. We should also rename the 
method in `testXXX` as we usually do it. The super class method names should be 
updated as `runTransportClientTest` is not correct anymore.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204991006
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
+import org.elasticsearch.action.bulk.BulkProcessor;
+import org.elasticsearch.client.RestHighLevelClient;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Elasticsearch 6.x sink that requests multiple {@link ActionRequest 
ActionRequests}
+ * against a cluster for each incoming element.
+ *
+ * The sink internally uses a {@link RestHighLevelClient} to 
communicate with an Elasticsearch cluster.
+ * The sink will fail if no cluster can be connected to using the provided 
transport addresses passed to the constructor.
+ *
+ * The {@link Map} passed to the constructor is used to create the 
{@code TransportClient}. The config keys can be found
+ * in the https://www.elastic.io";>Elasticsearch 
documentation. An important setting is {@code cluster.name},
+ * which should be set to the name of the cluster that the sink should 
emit to.
+ *
+ * Internally, the sink will use a {@link BulkProcessor} to send {@link 
ActionRequest ActionRequests}.
+ * This will buffer elements before sending a request to the cluster. The 
behaviour of the
+ * {@code BulkProcessor} can be configured using these config keys:
+ * 
+ *{@code bulk.flush.max.actions}: Maximum amount of elements to 
buffer
+ *{@code bulk.flush.max.size.mb}: Maximum amount of data (in 
megabytes) to buffer
+ *{@code bulk.flush.interval.ms}: Interval at which to flush data 
regardless of the other two
+ *   settings in milliseconds
+ * 
+ *
+ * You also have to provide an {@link ElasticsearchSinkFunction}. This 
is used to create multiple
+ * {@link ActionRequest ActionRequests} for each incoming element. See the 
class level documentation of
+ * {@link ElasticsearchSinkFunction} for an example.
+ *
+ * @param  Type of the elements handled by this sink
+ */
+public class ElasticsearchSink extends ElasticsearchSinkBase {
+
+   private static final long serialVersionUID = 1L;
+
+   /**
+* Creates a new {@code ElasticsearchSink} that connects to the cluster 
using a {@link RestHighLevelClient}.
+*
+* @param elasticsearchSinkFunction This is used to generate multiple 
{@link ActionRequest} from the incoming element
+* @param httpHosts The list of {@HttpHost} to which the {@link 
RestHighLevelClient} connects to.
+*/
+   public ElasticsearchSink(Map userConfig, List 
httpHosts, ElasticsearchSinkFunction elasticsearchSinkFunction) {
+
+   this(userConfig, httpHosts, elasticsearchSinkFunction, new 
NoOpFailureHandler());
+   }
+
+   /**
+* Creates a new {@code ElasticsearchSink} that connects to the cluster 
using a {@link RestHighLevelClient}.
+*
+* @param elasticsearchSinkFunction This is used to generate multiple 
{@link ActionRequest} from the incoming element
+* @param failureHandler This is used to handle failed {@link 
ActionRequest}
+* @param httpHosts The list of {@HttpHost} to which the {@link 
RestHighLevelClient} connects to.
--- End diff --

Fix two invalid Javadocs.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204993809
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/test/java/org/apache/flink/streaming/connectors/elasticsearch6/examples/ElasticsearchSinkExample.java
 ---
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RuntimeContext;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import org.apache.flink.streaming.connectors.elasticsearch.RequestIndexer;
+import 
org.apache.flink.streaming.connectors.elasticsearch6.ElasticsearchSink;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.index.IndexRequest;
+import org.elasticsearch.client.Requests;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * This example shows how to use the Elasticsearch Sink. Before running it 
you must ensure that
+ * you have a cluster named "elasticsearch" running or change the name of 
cluster in the config map.
+ */
+public class ElasticsearchSinkExample {
--- End diff --

We should add tests for our examples.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204991076
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch6/src/main/java/org/apache/flink/streaming/connectors/elasticsearch6/ElasticsearchSink.java
 ---
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.elasticsearch6;
+
+import 
org.apache.flink.streaming.connectors.elasticsearch.ActionRequestFailureHandler;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkBase;
+import 
org.apache.flink.streaming.connectors.elasticsearch.ElasticsearchSinkFunction;
+import 
org.apache.flink.streaming.connectors.elasticsearch.util.NoOpFailureHandler;
+
+import org.apache.http.HttpHost;
+import org.elasticsearch.action.ActionRequest;
+import org.elasticsearch.action.DocWriteRequest;
--- End diff --

Remove unused import.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204757871
  
--- Diff: 
flink-connectors/flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchApiCallBridge.java
 ---
@@ -39,23 +38,25 @@
  * exactly one instance of the call bridge, and state cleanup is performed 
when the sink is closed.
  */
 @Internal
-public interface ElasticsearchApiCallBridge extends Serializable {
+public abstract class ElasticsearchApiCallBridge implements Serializable {
--- End diff --

Parameterize the class instead of using `AutoClosable` as a synonym for the 
a client that implements this interface. This avoids manual casting in 
subclasses.


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204750192
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -138,6 +143,31 @@ input.addSink(new ElasticsearchSink<>(config, 
transportAddresses, new Elasticsea
 }
 }));{% endhighlight %}
 
+
+{% highlight java %}
+DataStream input = ...;
+
+List httpHost = new ArrayList<>();
+httpHosts.add(new HttpHost("127.0.0.1", 9200, "http"));
+httpHosts.add(new HttpHost("10.2.3.1", 9200, "http"));
+
+input.addSink(new ElasticsearchSink<>(httpHosts, new 
ElasticsearchSinkFunction() {
--- End diff --

Add an example for the user config as well to be in sync with the examples 
of other versions? Because the following paragraph mentions: 

> Especially important is the `cluster.name` parameter

Btw could you also add imports to your examples. I just started to do this 
with my code examples to make it easier for people to find the used classes 
(see 
[here](https://ci.apache.org/projects/flink/flink-docs-master/dev/java_lambdas.html))


---


[GitHub] flink pull request #6391: [FLINK-9885] [FLINK-8101] Finalize Elasticsearch 6...

2018-07-24 Thread twalthr
Github user twalthr commented on a diff in the pull request:

https://github.com/apache/flink/pull/6391#discussion_r204752713
  
--- Diff: docs/dev/connectors/elasticsearch.md ---
@@ -190,9 +220,30 @@ input.addSink(new ElasticsearchSink(config, 
transportAddresses, new Elasticsearc
 }))
 {% endhighlight %}
 
+
+{% highlight scala %}
+val input: DataStream[String] = ...
+
+val httpHosts = new java.util.ArrayList[HttpHost]
+httpHosts.add(new HttpHost("127.0.0.1", 9300, "http"))
+httpHosts.add(new HttpHost("10.2.3.1", 9300, "http"))
+
+input.addSink(new ElasticsearchSink(httpHosts, new 
ElasticsearchSinkFunction[String] {
+  def createIndexRequest(element: String): IndexRequest = {
+val json = new java.util.HashMap[String, String]
+json.put("data", element)
+
+return Requests.indexRequest()
+.index("my-index")
+.type("my-type")
+.source(json)
+  }
+}))
+{% endhighlight %}
+
 
 
-Note how a `Map` of `String`s is used to configure the `ElasticsearchSink`.
+Note how `TransportClient` based version use a `Map` of `String`s is used 
to configure the `ElasticsearchSink`.
--- End diff --

Remove "is used to"?


---


[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...

2018-07-24 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6408#discussion_r204999405
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -233,26 +225,69 @@ public void run() {

subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
long recordBatchSizeBytes = 0L;
-   long averageRecordSizeBytes = 0L;
-
for (UserRecord record : 
fetchedRecords) {
recordBatchSizeBytes += 
record.getData().remaining();

deserializeRecordForCollectionAndUpdateState(record);
}
 
-   if (useAdaptiveReads && 
!fetchedRecords.isEmpty()) {
-   averageRecordSizeBytes = 
recordBatchSizeBytes / fetchedRecords.size();
-   maxNumberOfRecordsPerFetch = 
getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-   }
-
nextShardItr = 
getRecordsResult.getNextShardIterator();
+
+   long processingEndTimeNanos = 
System.nanoTime();
--- End diff --

having a local variable here seems a bit redundant, since we always adjust 
it right afterwards.


---


[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...

2018-07-24 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6408#discussion_r204996330
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -233,26 +225,69 @@ public void run() {

subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
long recordBatchSizeBytes = 0L;
-   long averageRecordSizeBytes = 0L;
-
for (UserRecord record : 
fetchedRecords) {
recordBatchSizeBytes += 
record.getData().remaining();

deserializeRecordForCollectionAndUpdateState(record);
}
 
-   if (useAdaptiveReads && 
!fetchedRecords.isEmpty()) {
-   averageRecordSizeBytes = 
recordBatchSizeBytes / fetchedRecords.size();
-   maxNumberOfRecordsPerFetch = 
getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-   }
-
nextShardItr = 
getRecordsResult.getNextShardIterator();
+
+   long processingEndTimeNanos = 
System.nanoTime();
+
+   long adjustmentEndTimeNanos = 
adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos);
--- End diff --

just a minor nit pick here: `adjustmentEndTimeNanos` would be better named 
as `adjustedEndTimeNanos`


---


[GitHub] flink pull request #6231: [FLINK-9694] Potentially NPE in CompositeTypeSeria...

2018-07-24 Thread yanghua
Github user yanghua closed the pull request at:

https://github.com/apache/flink/pull/6231


---


[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...

2018-07-24 Thread tzulitai
Github user tzulitai commented on a diff in the pull request:

https://github.com/apache/flink/pull/6408#discussion_r204998677
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -233,26 +225,69 @@ public void run() {

subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
long recordBatchSizeBytes = 0L;
-   long averageRecordSizeBytes = 0L;
-
for (UserRecord record : 
fetchedRecords) {
recordBatchSizeBytes += 
record.getData().remaining();

deserializeRecordForCollectionAndUpdateState(record);
}
 
-   if (useAdaptiveReads && 
!fetchedRecords.isEmpty()) {
-   averageRecordSizeBytes = 
recordBatchSizeBytes / fetchedRecords.size();
-   maxNumberOfRecordsPerFetch = 
getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-   }
-
nextShardItr = 
getRecordsResult.getNextShardIterator();
+
+   long processingEndTimeNanos = 
System.nanoTime();
+
+   long adjustmentEndTimeNanos = 
adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos);
+   long runLoopTimeNanos = 
adjustmentEndTimeNanos - processingStartTimeNanos;
+   maxNumberOfRecordsPerFetch = 
adaptRecordsToRead(runLoopTimeNanos, fetchedRecords.size(), 
recordBatchSizeBytes);
+   processingStartTimeNanos = 
adjustmentEndTimeNanos; // for next time through the loop
}
}
} catch (Throwable t) {
fetcherRef.stopWithError(t);
}
}
 
+   /**
+* Adjusts loop timing to match target frequency if specified.
+* @param processingStartTimeNanos The start time of the run loop "work"
+* @param processingEndTimeNanos The end time of the run loop "work"
+* @return The System.nanoTime() after the sleep (if any)
+* @throws InterruptedException
+*/
+   protected long adjustRunLoopFrequency(long processingStartTimeNanos, 
long processingEndTimeNanos)
+   throws InterruptedException {
+   long endTimeNanos = processingEndTimeNanos;
+   if (fetchIntervalMillis != 0) {
+   long processingTimeNanos = processingEndTimeNanos - 
processingStartTimeNanos;
+   long sleepTimeMillis = fetchIntervalMillis - 
(processingTimeNanos / 1_000_000);
+   if (sleepTimeMillis > 0) {
+   Thread.sleep(sleepTimeMillis);
+   endTimeNanos = System.nanoTime();
+   }
+   }
+   return endTimeNanos;
+   }
+
+   /**
+* Calculates how many records to read each time through the loop based 
on a target throughput
+* and the measured frequenecy of the loop.
+* @param runLoopTimeNanos The total time of one pass through the loop
+* @param numRecords The number of records of the last read operation
+* @param recordBatchSizeBytes The total batch size of the last read 
operation
+*/
+   protected int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, 
long recordBatchSizeBytes) {
--- End diff --

If this is a method without side effects on the fields of the 
`ShardConsumer`, it might be better off to make this method static, and pass in 
the current `maxNumberOfRecordsPerFetch` as an argument.
This makes it more clear that it is only an utility method to calculate the 
number of records to read.


---


[GitHub] flink pull request #6412: [FLINK-9941] Flush in ScalaCsvOutputFormat before ...

2018-07-24 Thread buptljy
Github user buptljy closed the pull request at:

https://github.com/apache/flink/pull/6412


---


[GitHub] flink pull request #6412: [FLINK-9941] Flush in ScalaCsvOutputFormat before ...

2018-07-24 Thread buptljy
GitHub user buptljy opened a pull request:

https://github.com/apache/flink/pull/6412

[FLINK-9941] Flush in ScalaCsvOutputFormat before close

## What is the purpose of the change
- Flush in ScalaCsvOutputFormat before close.We've already finished it in 
org.apache.flink.api.java.io.CsvOutputFormat.

## Brief change log
- add flush in ScalaCsvOutputFormat before close.
## Verifying this change
- unit tests.
## Does this pull request potentially affect one of the following parts:
- no

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/buptljy/flink FLINK-9941

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6412.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6412


commit 636456d2398bef69a805b96dfb0945459cfcfada
Author: wind 
Date:   2018-07-25T06:01:36Z

flush ScalaCsvOutputFormat before close




---


[GitHub] flink pull request #6403: [FLINK-9934] [table] Fix invalid field mapping by ...

2018-07-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6403


---


[GitHub] flink pull request #3124: [FLINK-5281] Extend KafkaJsonTableSources to suppo...

2018-07-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/3124


---


[GitHub] flink pull request #6411: [FLINK-9941][ScalaAPI] Flush in ScalaCsvOutputForm...

2018-07-24 Thread Lemonjing
GitHub user Lemonjing opened a pull request:

https://github.com/apache/flink/pull/6411

[FLINK-9941][ScalaAPI] Flush in ScalaCsvOutputFormat before closing, to 
ensure CI stability

## What is the purpose of the change
This pull request update scala api `ScalaCsvOutputFormat` to increase CI 
stability.

## Brief change log
Add flush method before close function in ScalaCsvOutputFormat for scala 
API.

## Verifying this change
This change is already covered by existing tests, such as 
ScalarFunctionsTest

## Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): (yes / **no**)
- The public API, i.e., is any changed class annotated with 
@Public(Evolving): (yes / **no**)
- The serializers: (yes / **no** / don't know)
- The runtime per-record code paths (performance sensitive): (yes / **no** 
/ don't know)
- Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
- The S3 file system connector: (yes / **no** / don't know)

## Documentation
Does this pull request introduce a new feature? (yes / **no**)
If yes, how is the feature documented? (not applicable / docs / JavaDocs / 
**not documented**)

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/Lemonjing/flink csv-close-hotfix

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6411.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6411


commit 6c6120722eef81c2c275b92a13a5687fef35e7bb
Author: lemonjing <932191671@...>
Date:   2018-07-25T05:46:55Z

[hotfix] Flush in ScalaCsvOutputFormat before closing, to ensure CI 
stability




---


[GitHub] flink pull request #6410: Release 1.6

2018-07-24 Thread uang520
Github user uang520 closed the pull request at:

https://github.com/apache/flink/pull/6410


---


[GitHub] flink pull request #6410: Release 1.6

2018-07-24 Thread uang520
GitHub user uang520 opened a pull request:

https://github.com/apache/flink/pull/6410

Release 1.6

*Thank you very much for contributing to Apache Flink - we are happy that 
you want to help us improve Flink. To help the community review your 
contribution in the best possible way, please go through the checklist below, 
which will get the contribution into a shape in which it can be best reviewed.*

*Please understand that we do not do this to make contributions to Flink a 
hassle. In order to uphold a high standard of quality for code contributions, 
while at the same time managing a large number of contributions, we need 
contributors to prepare the contributions well, and give reviewers enough 
contextual information for the review. Please also understand that 
contributions that do not follow this guide will take longer to review and thus 
typically be picked up with lower priority by the community.*

## Contribution Checklist

  - Make sure that the pull request corresponds to a [JIRA 
issue](https://issues.apache.org/jira/projects/FLINK/issues). Exceptions are 
made for typos in JavaDoc or documentation files, which need no JIRA issue.
  
  - Name the pull request in the form "[FLINK-] [component] Title of 
the pull request", where *FLINK-* should be replaced by the actual issue 
number. Skip *component* if you are unsure about which is the best component.
  Typo fixes that have no associated JIRA issue should be named following 
this pattern: `[hotfix] [docs] Fix typo in event time introduction` or 
`[hotfix] [javadocs] Expand JavaDoc for PuncuatedWatermarkGenerator`.

  - Fill out the template below to describe the changes contributed by the 
pull request. That will give reviewers the context they need to do the review.
  
  - Make sure that the change passes the automated tests, i.e., `mvn clean 
verify` passes. You can set up Travis CI to do that following [this 
guide](http://flink.apache.org/contribute-code.html#best-practices).

  - Each pull request should address only one issue, not mix up code from 
multiple issues.
  
  - Each commit in the pull request has a meaningful commit message 
(including the JIRA id)

  - Once all items of the checklist are addressed, remove the above text 
and this checklist, leaving only the filled out template below.


**(The sections below can be removed for hotfixes of typos)**

## What is the purpose of the change

*(For example: This pull request makes task deployment go through the blob 
server, rather than through RPC. That way we avoid re-transferring them on each 
deployment (during recovery).)*


## Brief change log

*(for example:)*
  - *The TaskInfo is stored in the blob store on job creation time as a 
persistent artifact*
  - *Deployments RPC transmits only the blob storage reference*
  - *TaskManagers retrieve the TaskInfo from the blob cache*


## Verifying this change

*(Please pick either of the following options)*

This change is a trivial rework / code cleanup without any test coverage.

*(or)*

This change is already covered by existing tests, such as *(please describe 
tests)*.

*(or)*

This change added tests and can be verified as follows:

*(example:)*
  - *Added integration tests for end-to-end deployment with large payloads 
(100MB)*
  - *Extended integration test for recovery after master (JobManager) 
failure*
  - *Added test that validates that TaskInfo is transferred only once 
across recoveries*
  - *Manually verified the change by running a 4 node cluser with 2 
JobManagers and 4 TaskManagers, a stateful streaming program, and killing one 
JobManager and two TaskManagers during the execution, verifying that recovery 
happens correctly.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / no)
  - The serializers: (yes / no / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / no / 
don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / no / don't know)
  - The S3 file system connector: (yes / no / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / no)
  - If yes, how is the feature documented? (not applicable / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/apache/flink release-1.6

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6410.patch

To close this p

[GitHub] flink pull request #6401: [hotfix]fix typo for variable name dynamicProperti...

2018-07-24 Thread rileyli
Github user rileyli closed the pull request at:

https://github.com/apache/flink/pull/6401


---


[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...

2018-07-24 Thread glaksh100
Github user glaksh100 commented on a diff in the pull request:

https://github.com/apache/flink/pull/6408#discussion_r204942574
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -233,26 +225,68 @@ public void run() {

subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
long recordBatchSizeBytes = 0L;
-   long averageRecordSizeBytes = 0L;
-
for (UserRecord record : 
fetchedRecords) {
recordBatchSizeBytes += 
record.getData().remaining();

deserializeRecordForCollectionAndUpdateState(record);
}
 
-   if (useAdaptiveReads && 
!fetchedRecords.isEmpty()) {
-   averageRecordSizeBytes = 
recordBatchSizeBytes / fetchedRecords.size();
-   maxNumberOfRecordsPerFetch = 
getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-   }
-
nextShardItr = 
getRecordsResult.getNextShardIterator();
+
+   long processingEndTimeNanos = 
System.nanoTime();
+
+   long adjustmentEndTimeNanos = 
adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos);
+   long runLoopTimeNanos = 
adjustmentEndTimeNanos - processingStartTimeNanos;
+   adaptRecordsToRead(runLoopTimeNanos, 
fetchedRecords.size(), recordBatchSizeBytes);
+   processingStartTimeNanos = 
adjustmentEndTimeNanos; // for next time through the loop
}
}
} catch (Throwable t) {
fetcherRef.stopWithError(t);
}
}
 
+   /**
+* Adjusts loop timing to match target frequency if specified.
+* @param processingStartTimeNanos The start time of the run loop "work"
+* @param processingEndTimeNanos The end time of the run loop "work"
+* @return The System.nanoTime() after the sleep (if any)
+* @throws InterruptedException
+*/
+   protected long adjustRunLoopFrequency(long processingStartTimeNanos, 
long processingEndTimeNanos)
+   throws InterruptedException {
+   long endTimeNanos = processingEndTimeNanos;
+   if (fetchIntervalMillis != 0) {
+   long processingTimeNanos = processingEndTimeNanos - 
processingStartTimeNanos;
+   long sleepTimeMillis = fetchIntervalMillis - 
(processingTimeNanos / 1_000_000);
+   if (sleepTimeMillis > 0) {
+   Thread.sleep(sleepTimeMillis);
+   endTimeNanos = System.nanoTime();
+   }
+   }
+   return endTimeNanos;
+   }
+
+   /**
+* Calculates how many records to read each time through the loop based 
on a target throughput
+* and the measured frequenecy of the loop.
+* @param runLoopTimeNanos The total time of one pass through the loop
+* @param numRecords The number of records of the last read operation
+* @param recordBatchSizeBytes The total batch size of the last read 
operation
+*/
+   protected int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, 
long recordBatchSizeBytes) {
+   if (useAdaptiveReads && numRecords != 0 && runLoopTimeNanos != 
0) {
+   long averageRecordSizeBytes = recordBatchSizeBytes / 
numRecords;
+   // Adjust number of records to fetch from the shard 
depending on current average record size
+   // to optimize 2 Mb / sec read limits
+   double loopFrequencyHz = 10.0d / 
runLoopTimeNanos;
+   double bytesPerRead = 
KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz;
+   maxNumberOfRecordsPerFetch = (int) (bytesPerRead / 
averageRecordSizeBytes);
+   // Ensure the value is not more than 1L
+   maxNumberOfRecordsPerFetch = 
Math.min(maxNumberOfRecordsPerFetch, 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);
+   }
+   return maxNumberOfRecordsPerFetch;
--- End diff --

Oops, thanks for catching. Updated to use the return value and also to use 
a local variable in the method to avoid re-assigning the class variable 
`maxNumberOfRecordsP

[GitHub] flink pull request #6408: [FLINK-9897][Kinesis Connector] Make adaptive read...

2018-07-24 Thread tweise
Github user tweise commented on a diff in the pull request:

https://github.com/apache/flink/pull/6408#discussion_r204922265
  
--- Diff: 
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/ShardConsumer.java
 ---
@@ -233,26 +225,68 @@ public void run() {

subscribedShard.getShard().getHashKeyRange().getEndingHashKey());
 
long recordBatchSizeBytes = 0L;
-   long averageRecordSizeBytes = 0L;
-
for (UserRecord record : 
fetchedRecords) {
recordBatchSizeBytes += 
record.getData().remaining();

deserializeRecordForCollectionAndUpdateState(record);
}
 
-   if (useAdaptiveReads && 
!fetchedRecords.isEmpty()) {
-   averageRecordSizeBytes = 
recordBatchSizeBytes / fetchedRecords.size();
-   maxNumberOfRecordsPerFetch = 
getAdaptiveMaxRecordsPerFetch(averageRecordSizeBytes);
-   }
-
nextShardItr = 
getRecordsResult.getNextShardIterator();
+
+   long processingEndTimeNanos = 
System.nanoTime();
+
+   long adjustmentEndTimeNanos = 
adjustRunLoopFrequency(processingStartTimeNanos, processingEndTimeNanos);
+   long runLoopTimeNanos = 
adjustmentEndTimeNanos - processingStartTimeNanos;
+   adaptRecordsToRead(runLoopTimeNanos, 
fetchedRecords.size(), recordBatchSizeBytes);
+   processingStartTimeNanos = 
adjustmentEndTimeNanos; // for next time through the loop
}
}
} catch (Throwable t) {
fetcherRef.stopWithError(t);
}
}
 
+   /**
+* Adjusts loop timing to match target frequency if specified.
+* @param processingStartTimeNanos The start time of the run loop "work"
+* @param processingEndTimeNanos The end time of the run loop "work"
+* @return The System.nanoTime() after the sleep (if any)
+* @throws InterruptedException
+*/
+   protected long adjustRunLoopFrequency(long processingStartTimeNanos, 
long processingEndTimeNanos)
+   throws InterruptedException {
+   long endTimeNanos = processingEndTimeNanos;
+   if (fetchIntervalMillis != 0) {
+   long processingTimeNanos = processingEndTimeNanos - 
processingStartTimeNanos;
+   long sleepTimeMillis = fetchIntervalMillis - 
(processingTimeNanos / 1_000_000);
+   if (sleepTimeMillis > 0) {
+   Thread.sleep(sleepTimeMillis);
+   endTimeNanos = System.nanoTime();
+   }
+   }
+   return endTimeNanos;
+   }
+
+   /**
+* Calculates how many records to read each time through the loop based 
on a target throughput
+* and the measured frequenecy of the loop.
+* @param runLoopTimeNanos The total time of one pass through the loop
+* @param numRecords The number of records of the last read operation
+* @param recordBatchSizeBytes The total batch size of the last read 
operation
+*/
+   protected int adaptRecordsToRead(long runLoopTimeNanos, int numRecords, 
long recordBatchSizeBytes) {
+   if (useAdaptiveReads && numRecords != 0 && runLoopTimeNanos != 
0) {
+   long averageRecordSizeBytes = recordBatchSizeBytes / 
numRecords;
+   // Adjust number of records to fetch from the shard 
depending on current average record size
+   // to optimize 2 Mb / sec read limits
+   double loopFrequencyHz = 10.0d / 
runLoopTimeNanos;
+   double bytesPerRead = 
KINESIS_SHARD_BYTES_PER_SECOND_LIMIT / loopFrequencyHz;
+   maxNumberOfRecordsPerFetch = (int) (bytesPerRead / 
averageRecordSizeBytes);
+   // Ensure the value is not more than 1L
+   maxNumberOfRecordsPerFetch = 
Math.min(maxNumberOfRecordsPerFetch, 
ConsumerConfigConstants.DEFAULT_SHARD_GETRECORDS_MAX);
+   }
+   return maxNumberOfRecordsPerFetch;
--- End diff --

the return value is never used


---


[GitHub] flink pull request #6409: Flink 9899.kinesis connector metrics

2018-07-24 Thread glaksh100
GitHub user glaksh100 opened a pull request:

https://github.com/apache/flink/pull/6409

Flink 9899.kinesis connector metrics

## What is the purpose of the change

The purpose of this change is to add metrics to the `ShardConsumer` to get 
more observability into the performance of the Kinesis connector, including the 
enhancements introduced in 
[FLINK-9897](https://issues.apache.org/jira/browse/FLINK-9899) . 

**Important** - https://github.com/apache/flink/pull/6408 has to be merged 
**before** taking out this change.

## Brief change log
All metrics are added as gauges. The following per-shard metrics are added. 
:
- sleepTimeMillis
- maxNumberOfRecordsPerFetch
- numberOfAggregatedRecordsPerFetch
- numberOfDeaggregatedRecordsPerFetch
- bytesRequestedPerFetch
- averageRecordSizeBytes
- runLoopTimeNanos
- loopFrequencyHz

## Verifying this change

This change is already covered by existing tests, such as: 
`ShardConsumerTest`, `KinesisDataFetcherTest`.

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lyft/flink FLINK-9899.KinesisConnectorMetrics

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6409.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6409


commit f333781a7c4f1a10b6120a962ff211e023bafaab
Author: Lakshmi Gururaja Rao 
Date:   2018-07-24T18:44:08Z

[FLINK-9897] Make adaptive reads depend on run loop time instead of fetch 
interval millis

Remove unused method

commit f51703177df9afcdba3778909b1e9d8b7fa4bf46
Author: Lakshmi Gururaja Rao 
Date:   2018-07-24T18:44:08Z

[FLINK-9897] Make adaptive reads depend on run loop time instead of fetch 
interval millis

commit d493097d09c6223383282ed90648853715b197ce
Author: Lakshmi Gururaja Rao 
Date:   2018-07-24T21:13:53Z

[FLINK-9899] Add more ShardConsumer metrics

Checkstyle fix




---


[GitHub] flink pull request #6408: [FLINK-9897] Make adaptive reads depend on run loo...

2018-07-24 Thread glaksh100
GitHub user glaksh100 opened a pull request:

https://github.com/apache/flink/pull/6408

[FLINK-9897] Make adaptive reads depend on run loop time instead of 
fetchintervalmillis

## What is the purpose of the change
[FLINK-9692](https://github.com/apache/flink/pull/6300) introduced the 
feature of adapting `maxNumberOfRecordsPerFetch` based on the average size of 
Kinesis records. The PR assumed a maximum of `1/fetchIntervalMillis` 
reads/second. However, in the case that the run loop of the `ShardConsumer` 
takes more than `fetchIntervalMillis` to process records, the 
`maxNumberOfRecordsPerFetch` is still sub-optimal. The purpose of this change 
is to make the adaptive reads more efficient by using the actual run loop 
frequency to determine the number of reads/second and 
`maxNumberOfRecordsPerFetch`. The change also re-factors the run loop to be 
more modular.


## Brief change log

  - `processingStartTimeNanos` records start time of loop
  -  `processingEndTimeNanos` records end time of loop
  -  `adjustRunLoopFrequency()` adjusts end time depending on 
`sleepTimeMillis` (if any).
  -  `runLoopTimeNanos` records actual run loop time.
  -  `adaptRecordsToRead` calculates `maxNumberOfRecordsPerFetch` based on 
`runLoopTimeNanos`
  - Unused method `getAdaptiveMaxRecordsPerFetch` is removed.

## Verifying this change

This change is already covered by existing tests, such as 
`ShardConsumerTest`
This has also been tested against a stream with the following configuration
```
Number of Shards: 512
Parallelism: 128
```

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (yes / **no**)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (yes / **no** / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/lyft/flink FLINK-9897.AdaptiveReadsRunLoop

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6408.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6408


commit 786556b9a9a509051a14772fbbd282db73e65252
Author: Lakshmi Gururaja Rao 
Date:   2018-07-24T18:44:08Z

[FLINK-9897] Make adaptive reads depend on run loop time instead of fetch 
interval millis




---


[GitHub] flink pull request #6407: [FLINK-8478][docs] Add documentation page for diff...

2018-07-24 Thread florianschmidt1994
GitHub user florianschmidt1994 opened a pull request:

https://github.com/apache/flink/pull/6407

[FLINK-8478][docs] Add documentation page for different datastream joins

## What is the purpose of the change

Add a documentation page under Application Development / Streaming / 
Joining that describes
1. The different types of window joins in the DataStream API
2. The newly introduced interval join in the DataStream API

## Brief change log
  - Added a new docs page
  - Added images to describe common scenarios

## Verifying this change

Built the documentation with `./build_docs.sh` and it looks like expected

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): no
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: no
  - The serializers: no
  - The runtime per-record code paths (performance sensitive): no
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: no
  - The S3 file system connector: no

## Documentation

  - Does this pull request introduce a new feature? no
  - If yes, how is the feature documented? not applicable

For those that just want to have a "quick look" I attached a screenshot

![](https://i.imgur.com/c78WuD7.jpg)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/florianschmidt1994/flink 
flink-8478-add-docs-for-joins

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6407.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6407


commit d2800ff33af179dd32876020d27994b9dc6579aa
Author: Florian Schmidt 
Date:   2018-07-24T16:14:50Z

[FLINK-8478] Add documentation page for different datastream joins




---


[GitHub] flink pull request #6406: [FLINK-9159][runtime] Sanity check default timeout...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6406#discussion_r204820593
  
--- Diff: flink-docs/README.md ---
@@ -28,7 +28,7 @@ The `RestAPIDocGenerator` can be used to generate a full 
reference of the REST A
 To integrate a new endpoint into the generator
 1. Add a new `DocumentingRestEndpoint` class to `RestAPIDocGenerator` that 
extends the new endpoint class
 2. Add another call to `createHtmlFile` in `RestAPIDocGenerator#main`
-3. Regenerate the documentation by running `mvn package 
-Dgenerate-rest-docs -pl flink-docs -am -nsu`
+3. Regenerate the documentation by running `mvn package 
-Dgenerate-rest-docs -pl flink-docs -am -nsu -DskipTests`
--- End diff --

ah wait a second, this would run the test for all modules. urgh...


---


[GitHub] flink pull request #6406: [FLINK-9159][runtime] Sanity check default timeout...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6406#discussion_r204819304
  
--- Diff: flink-docs/README.md ---
@@ -28,7 +28,7 @@ The `RestAPIDocGenerator` can be used to generate a full 
reference of the REST A
 To integrate a new endpoint into the generator
 1. Add a new `DocumentingRestEndpoint` class to `RestAPIDocGenerator` that 
extends the new endpoint class
 2. Add another call to `createHtmlFile` in `RestAPIDocGenerator#main`
-3. Regenerate the documentation by running `mvn package 
-Dgenerate-rest-docs -pl flink-docs -am -nsu`
+3. Regenerate the documentation by running `mvn package 
-Dgenerate-rest-docs -pl flink-docs -am -nsu -DskipTests`
--- End diff --

I intentionally left this option out. This guarantees the correctness of 
the output since it both tests the generator before the generation, and the 
completeness afterwards. There's a lot of additional options one _could_ add, 
but I'd prefer if we stick to the _necessary_ ones.


---


[GitHub] flink pull request #6406: [FLINK-9159][runtime] Sanity check default timeout...

2018-07-24 Thread GJL
GitHub user GJL opened a pull request:

https://github.com/apache/flink/pull/6406

[FLINK-9159][runtime] Sanity check default timeout values

## What is the purpose of the change

*Set the default timeouts for resource release to sane values. Consolidate 
config keys and documentation.*

## Brief change log

  - *Set default value of `mesos.failover-timeout` to 1 week.*
  - *Deprecate config key `slotmanager.request-timeout`*


## Verifying this change

This change added tests and can be verified as follows:

  - *Added test `SlotManagerConfigurationTest` to verify that slot request 
timeouts are set correctly.*
  - *Manually deployed on Mesos 1.5.0.*

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (yes / **no**)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (**yes** / no)
  - The serializers: (yes / **no** / don't know)
  - The runtime per-record code paths (performance sensitive): (yes / 
**no** / don't know)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (**yes** / no / don't know)
  - The S3 file system connector: (yes / **no** / don't know)

## Documentation

  - Does this pull request introduce a new feature? (yes / **no**)
  - If yes, how is the feature documented? (**not applicable** / docs / 
JavaDocs / not documented)

cc: @tillrohrmann 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/GJL/flink FLINK-9159

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6406.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6406


commit 96ef18e322c5623a609382057985b35971ba3234
Author: gyao 
Date:   2018-07-24T13:38:47Z

[FLINK-9159][mesos] Set default value of mesos.failover-timeout to 1 week.

commit 112122912d7dd78c612c1648f3e2b041ae65afa6
Author: gyao 
Date:   2018-07-24T13:48:27Z

[FLINK-9159][runtime] Deprecate config key slotmanager.request-timeout

- Replace config key slotmanager.request-timeout with slot.request.timeout 
because
both keys have effectively the same semantics.
- Rename key slotmanager.taskmanager-timeout to
resourcemanager.taskmanager-timeout.

commit 787f7c1480a5676e7ce52092265b3cd051064e3c
Author: gyao 
Date:   2018-07-24T13:55:16Z

[hotfix][docs] Add -DskipTests flag to command that generates docs.




---


[GitHub] flink pull request #6398: [FLINK-9923][tests] Harden OneInputStreamTaskTest#...

2018-07-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6398


---


[GitHub] flink pull request #6405: [FLINK-8439] Add Flink shading to AWS credential p...

2018-07-24 Thread azagrebin
GitHub user azagrebin opened a pull request:

https://github.com/apache/flink/pull/6405

[FLINK-8439] Add Flink shading to AWS credential provider s3 hadoop c…

## What is the purpose of the change

This PR refactors S3 Hadoop and Presto file systems and adds Flink shading 
to AWS credential provider config.

## Brief change log

  - extract `AbstractS3FileSystemFactory` base class from `s3hadoop.` and 
`s3presto.S3FileSystemFactory`s
  - extract hadoop configuration logic into `HadoopConfigLoader` with Flink 
shading of certain Hadoop configs
  -  add Flink shading to AWS credential provider config of 
`S3FileSystemFactory`s

## Verifying this change

run unit tests

## Does this pull request potentially affect one of the following parts:

  - Dependencies (does it add or upgrade a dependency): (no)
  - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: (no)
  - The serializers: (no)
  - The runtime per-record code paths (performance sensitive): (no)
  - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Yarn/Mesos, ZooKeeper: (no)
  - The S3 file system connector: (yes)

## Documentation

  - Does this pull request introduce a new feature? (no)
  - If yes, how is the feature documented? (not applicable)


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/azagrebin/flink FLINK-8439

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/flink/pull/6405.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #6405


commit 44c6eafb6b0757deb89f4e4a7e9bb237f7336428
Author: Andrey Zagrebin 
Date:   2018-07-23T16:10:55Z

[FLINK-8439] Add Flink shading to AWS credential provider s3 hadoop config




---


[GitHub] flink pull request #6400: [FLINK-9931][ui] watermark display bug.

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6400#discussion_r204758822
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
 ---
@@ -28,15 +35,21 @@
  * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
  *
  * If the query parameters do contain a "get" parameter, a 
comma-separated list of metric names is expected as a value.
- * {@code /metrics?get=X,Y}
+ * {@code /metrics?get=X,Y OR /metrics?get=X,Y&&subtasks=0-4,7-10}
  * The handler will then return a list containing the values of the 
requested metrics.
  * {@code [ { "id" : "X", "value" : "S" }, { "id" : "Y", "value" : "T" } ] 
}
  *
  * @deprecated This class is subsumed by {@link SubtaskMetricsHandler} and 
is only kept for backwards-compatibility.
  */
 @Deprecated
 public class JobVertexMetricsHandler extends AbstractMetricsHandler {
+   private final Logger log = LoggerFactory.getLogger(getClass());
--- End diff --

either add this as a protected field to the `AbstractMetricsHandler`, or 
change it to being static.


---


[GitHub] flink pull request #6400: [FLINK-9931][ui] watermark display bug.

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6400#discussion_r204757730
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
 ---
@@ -57,4 +70,77 @@ public JobVertexMetricsHandler(Executor executor, 
MetricFetcher fetcher) {
? task.metrics
: null;
}
+
+   @Override
+   protected String getRequestMetricsList(Map queryParams) 
{
+   String metricRequestsList = queryParams.get(PARAMETER_METRICS);
+   String subtasksList = queryParams.get(SUB_TASKS);
+   if (subtasksList == null || subtasksList.isEmpty()) {
+   return queryParams.get(PARAMETER_METRICS);
--- End diff --

return `super.getRequestMetricsList(queryParams)` instead


---


[GitHub] flink pull request #6400: [FLINK-9931][ui] watermark display bug.

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6400#discussion_r204759313
  
--- Diff: 
flink-runtime/src/main/java/org/apache/flink/runtime/rest/handler/legacy/metrics/JobVertexMetricsHandler.java
 ---
@@ -28,15 +35,21 @@
  * {@code {"available": [ { "name" : "X", "id" : "X" } ] } }
  *
  * If the query parameters do contain a "get" parameter, a 
comma-separated list of metric names is expected as a value.
- * {@code /metrics?get=X,Y}
+ * {@code /metrics?get=X,Y OR /metrics?get=X,Y&&subtasks=0-4,7-10}
--- End diff --

should only be a single `&`


---


[GitHub] flink pull request #6388: [FLINK-6222] Allow passing env variables to start ...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6388#discussion_r204750473
  
--- Diff: flink-dist/src/main/flink-bin/bin/config.sh ---
@@ -96,6 +96,8 @@ DEFAULT_ENV_JAVA_OPTS=""# 
Optional JVM args
 DEFAULT_ENV_JAVA_OPTS_JM="" # Optional JVM args 
(JobManager)
 DEFAULT_ENV_JAVA_OPTS_TM="" # Optional JVM args 
(TaskManager)
 DEFAULT_ENV_SSH_OPTS="" # Optional SSH 
parameters running in cluster mode
+DEFAULT_YARN_CONF_DIR=""# YARN Configuration Directory, if 
necessary
--- End diff --

indentation?


---


[GitHub] flink pull request #6393: [FLINK-9296] [table] Add support for non-windowed ...

2018-07-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6393


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204720999
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
 ---
@@ -297,7 +300,13 @@ public void testFailingDataSinkTask() {
Configuration stubParams = new Configuration();
super.getTaskConfig().setStubParameters(stubParams);
 
-   super.registerFileOutputTask(MockFailingOutputFormat.class, new 
File(tempTestPath).toURI().toString());
+   File tempTestFile = null;
+   try {
+   tempTestFile = tempFolder.newFile(tempTestFileName);
--- End diff --

same issue as in `JarFileCreatorTest`


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204721547
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
 ---
@@ -45,33 +46,30 @@
 
 public class DataSourceTaskTest extends TaskTestBase {
 
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
private static final int MEMORY_MANAGER_SIZE = 1024 * 1024;
 
private static final int NETWORK_BUFFER_SIZE = 1024;
 
private List outList;
-   
-   private String tempTestPath = 
DataSinkTaskTest.constructTestPath(DataSourceTaskTest.class, "dst_test");
-   
-   @After
-   public void cleanUp() {
-   File tempTestFile = new File(this.tempTestPath);
-   if(tempTestFile.exists()) {
-   tempTestFile.delete();
-   }
-   }
+
+   private final String tempTestFileName = getClass().getName() + 
"-dst_test";

@Test
public void testDataSourceTask() {
int keyCnt = 100;
int valCnt = 20;

this.outList = new ArrayList();
-   
+   File tempTestFile = null;
try {
-   InputFilePreparator.prepareInputFile(new 
UniformRecordGenerator(keyCnt, valCnt, false), 
-   this.tempTestPath, true);
+   tempTestFile = tempFolder.newFile(tempTestFileName);
+   InputFilePreparator.prepareInputFile(new 
UniformRecordGenerator(keyCnt, valCnt, false),
+   tempTestFile, true);
} catch (IOException e1) {
+   System.err.println(e1);
--- End diff --

let's change the method signature to throw `IOException` and remove the 
try/catch block.


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204721015
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
 ---
@@ -337,7 +345,13 @@ public void testFailingSortingDataSinkTask() {
super.getTaskConfig().setFilehandlesInput(0, 8);
super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
 
-   super.registerFileOutputTask(MockFailingOutputFormat.class, new 
File(tempTestPath).toURI().toString());
+   File tempTestFile = null;
+   try {
+   tempTestFile = tempFolder.newFile(tempTestFileName);
--- End diff --

same issue as in `JarFileCreatorTest`


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204721035
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
 ---
@@ -363,7 +376,9 @@ public void testCancelDataSinkTask() throws Exception {
Configuration stubParams = new Configuration();
super.getTaskConfig().setStubParameters(stubParams);
 
-   super.registerFileOutputTask(MockOutputFormat.class,  new 
File(tempTestPath).toURI().toString());
+   File tempTestFile = tempFolder.newFile(tempTestFileName);
--- End diff --

same issue as in `JarFileCreatorTest`


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204720914
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
 ---
@@ -79,12 +75,11 @@ public void testDataSinkTask() {
 
DataSinkTask testTask = new 
DataSinkTask<>(this.mockEnv);
 
-   super.registerFileOutputTask(MockOutputFormat.class, 
new File(tempTestPath).toURI().toString());
+   File tempTestFile = 
tempFolder.newFile(tempTestFileName);
--- End diff --

same issue as in `JarFileCreatorTest`


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204721046
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
 ---
@@ -419,7 +432,13 @@ public void testCancelSortingDataSinkTask() {
super.getTaskConfig().setFilehandlesInput(0, 8);
super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
 
-   super.registerFileOutputTask(MockOutputFormat.class,  new 
File(tempTestPath).toURI().toString());
+   File tempTestFile = null;
+   try {
+   tempTestFile = tempFolder.newFile(tempTestFileName);
--- End diff --

same issue as in `JarFileCreatorTest`


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204721390
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSourceTaskTest.java
 ---
@@ -45,33 +46,30 @@
 
 public class DataSourceTaskTest extends TaskTestBase {
 
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();
+
private static final int MEMORY_MANAGER_SIZE = 1024 * 1024;
 
private static final int NETWORK_BUFFER_SIZE = 1024;
 
private List outList;
-   
-   private String tempTestPath = 
DataSinkTaskTest.constructTestPath(DataSourceTaskTest.class, "dst_test");
-   
-   @After
-   public void cleanUp() {
-   File tempTestFile = new File(this.tempTestPath);
-   if(tempTestFile.exists()) {
-   tempTestFile.delete();
-   }
-   }
+
+   private final String tempTestFileName = getClass().getName() + 
"-dst_test";

@Test
public void testDataSourceTask() {
int keyCnt = 100;
int valCnt = 20;

this.outList = new ArrayList();
-   
+   File tempTestFile = null;
try {
-   InputFilePreparator.prepareInputFile(new 
UniformRecordGenerator(keyCnt, valCnt, false), 
-   this.tempTestPath, true);
+   tempTestFile = tempFolder.newFile(tempTestFileName);
--- End diff --

same issue as in `JarFileCreatorTest`


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204720983
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
 ---
@@ -225,7 +224,13 @@ public void testSortingDataSinkTask() {
super.getTaskConfig().setFilehandlesInput(0, 8);
super.getTaskConfig().setSpillingThresholdInput(0, 0.8f);
 
-   super.registerFileOutputTask(MockOutputFormat.class, new 
File(tempTestPath).toURI().toString());
+   File tempTestFile = null;
+   try {
+   tempTestFile = tempFolder.newFile(tempTestFileName);
--- End diff --

same issue as in `JarFileCreatorTest`


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204718919
  
--- Diff: 
flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBStateBackendConfigTest.java
 ---
@@ -481,10 +479,6 @@ public void 
testCallsForwardedToNonPartitionedBackend() throws Exception {
env.getTaskKvStateRegistry());
}
 
-   static Environment getMockEnvironment() {
-   return getMockEnvironment(new File[] { new 
File(System.getProperty("java.io.tmpdir")) });
-   }
-
static Environment getMockEnvironment(File[] tempDirs) {
--- End diff --

we could change this to a vararg method; then we could simplify calls like 
`getMockEnvironment(new File[] { tempFolder.newFolder() });`.


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204722139
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
 ---
@@ -240,36 +205,19 @@ public void testOnlyLevel2NestedDirectories() {
 */
@Test
public void testTwoNestedDirectoriesWithFilteredFilesTrue() {
-
-   String sep = System.getProperty("file.separator");
-
try {
String firstLevelDir = TestFileUtils.randomFileName();
String secondLevelDir = TestFileUtils.randomFileName();
String thirdLevelDir = TestFileUtils.randomFileName();
String secondLevelFilterDir = 
"_"+TestFileUtils.randomFileName();
String thirdLevelFilterDir = 
"_"+TestFileUtils.randomFileName();
 
-   File nestedDir = new File(tempPath + sep + 
firstLevelDir);
-   nestedDir.mkdirs();
-   nestedDir.deleteOnExit();
-
-   File insideNestedDir = new File(tempPath + sep + 
firstLevelDir + sep + secondLevelDir);
-   insideNestedDir.mkdirs();
-   insideNestedDir.deleteOnExit();
-   File insideNestedDirFiltered = new File(tempPath + sep 
+ firstLevelDir + sep + secondLevelFilterDir);
-   insideNestedDirFiltered.mkdirs();
-   insideNestedDirFiltered.deleteOnExit();
-   File filteredFile = new File(tempPath + sep + 
firstLevelDir + sep + "_IWillBeFiltered");
-   filteredFile.createNewFile();
-   filteredFile.deleteOnExit();
-
-   File nestedNestedDir = new File(tempPath + sep + 
firstLevelDir + sep + secondLevelDir + sep + thirdLevelDir);
-   nestedNestedDir.mkdirs();
-   nestedNestedDir.deleteOnExit();
-   File nestedNestedDirFiltered = new File(tempPath + sep 
+ firstLevelDir + sep + secondLevelDir + sep + thirdLevelFilterDir);
-   nestedNestedDirFiltered.mkdirs();
-   nestedNestedDirFiltered.deleteOnExit();
+   File nestedNestedDirFiltered = 
tempFolder.newFolder(firstLevelDir, secondLevelDir, thirdLevelDir, 
thirdLevelFilterDir);
+   File nestedNestedDir = 
nestedNestedDirFiltered.getParentFile();
+   File insideNestedDir = nestedNestedDir.getParentFile();
+   File nestedDir = insideNestedDir.getParentFile();
+   File insideNestedDirFiltered = 
tempFolder.newFolder(firstLevelDir, secondLevelFilterDir);
+   new File(nestedDir, "_IWillBeFiltered");
--- End diff --

This file isn't explicitly created anymore.


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204719730
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java
 ---
@@ -127,7 +132,7 @@ public void testSlotAllocation() throws Exception {
TestingUtils.infiniteTime());
 
final File[] taskExecutorLocalStateRootDirs =
-   new File[]{new 
File(System.getProperty("java.io.tmpdir"), "localRecovery")};
+   new File[]{ tempFolder.newFolder("localRecovery") };
--- End diff --

same issue as in `JarFileCreatorTest`


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204720974
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
 ---
@@ -141,7 +136,13 @@ public void testUnionDataSinkTask() {
 
DataSinkTask testTask = new 
DataSinkTask<>(this.mockEnv);
 
-   super.registerFileOutputTask(MockOutputFormat.class, new 
File(tempTestPath).toURI().toString());
+   File tempTestFile = null;
+   try {
+   tempTestFile = tempFolder.newFile(tempTestFileName);
--- End diff --

same issue as in `JarFileCreatorTest`


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204719565
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/util/JarFileCreatorTest.java
 ---
@@ -218,7 +224,7 @@ public void TestAnonymousClass() throws IOException {
 
@Test
public void TestExtendIdentifier() throws IOException {
-   File out = new File(System.getProperty("java.io.tmpdir"), 
"jarcreatortest.jar");
+   File out = tempFolder.newFile("jarcreatortest.jar");
--- End diff --

to preserve the existing behavior of passing a file that does not exist, do 
this instead:
```
File out = new File(tempFolder.getRoot(), "jarcreatortest.jar");
```

Also applies to the other tests in this file.


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204720758
  
--- Diff: 
flink-runtime/src/test/java/org/apache/flink/runtime/operators/DataSinkTaskTest.java
 ---
@@ -49,22 +50,17 @@
 import static org.junit.Assert.assertTrue;
 
 public class DataSinkTaskTest extends TaskTestBase {
+
+   @Rule
+   public TemporaryFolder tempFolder = new TemporaryFolder();

private static final Logger LOG = 
LoggerFactory.getLogger(DataSinkTaskTest.class);
 
private static final int MEMORY_MANAGER_SIZE = 3 * 1024 * 1024;
 
private static final int NETWORK_BUFFER_SIZE = 1024;
 
-   private final String tempTestPath = 
constructTestPath(DataSinkTaskTest.class, "dst_test");
-
-   @After
-   public void cleanUp() {
-   File tempTestFile = new File(this.tempTestPath);
-   if(tempTestFile.exists()) {
-   tempTestFile.delete();
-   }
-   }
+   private final String tempTestFileName = getClass().getName() + 
"-dst_test";
--- End diff --

since this was only used to prevent name clashes between tests it should 
now be redundant and can be removed.


---


[GitHub] flink pull request #6399: [FLINK-5860] [tests] Replace java.io.tmpdir with J...

2018-07-24 Thread zentol
Github user zentol commented on a diff in the pull request:

https://github.com/apache/flink/pull/6399#discussion_r204720065
  
--- Diff: 
flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java
 ---
@@ -338,22 +279,11 @@ public void testGetStatisticsMultipleNestedFiles() {
String secondLevelDir = TestFileUtils.randomFileName();
String secondLevelDir2 = TestFileUtils.randomFileName();
 
-   File nestedDir = new File(tempPath + 
System.getProperty("file.separator") 
-   + firstLevelDir);
-   nestedDir.mkdirs();
-   nestedDir.deleteOnExit();
+   File insideNestedDir = 
tempFolder.newFolder(firstLevelDir, secondLevelDir);
+   File insideNestedDir2 = 
tempFolder.newFolder(firstLevelDir, secondLevelDir2);
+   File nestedDir = insideNestedDir.getParentFile();
 
-   File insideNestedDir = new File(tempPath + 
System.getProperty("file.separator") 
-   + firstLevelDir + 
System.getProperty("file.separator") + secondLevelDir);
-   insideNestedDir.mkdirs();
-   insideNestedDir.deleteOnExit();
-
-   File insideNestedDir2 = new File(tempPath + 
System.getProperty("file.separator")
-   + firstLevelDir + 
System.getProperty("file.separator") + secondLevelDir2);
-   insideNestedDir2.mkdirs();
-   insideNestedDir2.deleteOnExit();
-
-   // create a file in the first-level and two files in 
the nested dir
+   // create a file in the first-level and two 
files in the nested dir
--- End diff --

revert indentation change


---


[GitHub] flink pull request #6341: [FLINK-5750] Incorrect translation of n-ary Union

2018-07-24 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/flink/pull/6341


---


  1   2   3   4   5   6   7   8   9   10   >