[flink] branch master updated: [FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong channels for inflight data.

2020-06-05 Thread zhijiang
This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new d25609d  [FLINK-18139][checkpointing] Fixing unaligned checkpoints 
checks wrong channels for inflight data.
d25609d is described below

commit d25609d07f52ba9bac3f7bb33ce0635e255e3d9d
Author: Arvid Heise 
AuthorDate: Thu Jun 4 22:51:18 2020 +0200

[FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong 
channels for inflight data.

CheckpointBarrierUnaligner#hasInflightData was not called with input gate 
contextual information, such that only the same first few channels are checked 
during initial snapshotting of inflight data for multi-gate setups.
---
 .../partition/consumer/SingleInputGateBuilder.java |  4 +-
 .../io/AlternatingCheckpointBarrierHandler.java|  5 +-
 .../runtime/io/CheckpointBarrierHandler.java   |  3 +-
 .../runtime/io/CheckpointBarrierUnaligner.java |  4 +-
 .../runtime/io/CheckpointedInputGate.java  |  5 +-
 .../AlternatingCheckpointBarrierHandlerTest.java   |  4 +-
 .../runtime/io/StreamTaskNetworkInputTest.java | 73 +-
 7 files changed, 85 insertions(+), 13 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
index ad607f7..b279998 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java
@@ -70,12 +70,12 @@ public class SingleInputGateBuilder {
return this;
}
 
-   SingleInputGateBuilder setConsumedSubpartitionIndex(int 
consumedSubpartitionIndex) {
+   public SingleInputGateBuilder setConsumedSubpartitionIndex(int 
consumedSubpartitionIndex) {
this.consumedSubpartitionIndex = consumedSubpartitionIndex;
return this;
}
 
-   SingleInputGateBuilder setSingleInputGateIndex(int gateIndex) {
+   public SingleInputGateBuilder setSingleInputGateIndex(int gateIndex) {
this.gateIndex = gateIndex;
return this;
}
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
index 09f05b9..8ed6788 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java
@@ -18,6 +18,7 @@
 package org.apache.flink.streaming.runtime.io;
 
 import org.apache.flink.runtime.checkpoint.CheckpointException;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener;
@@ -99,9 +100,9 @@ class AlternatingCheckpointBarrierHandler extends 
CheckpointBarrierHandler {
}
 
@Override
-   public boolean hasInflightData(long checkpointId, int channelIndex) {
+   public boolean hasInflightData(long checkpointId, InputChannelInfo 
channelInfo) {
// should only be called for unaligned checkpoint
-   return unalignedHandler.hasInflightData(checkpointId, 
channelIndex);
+   return unalignedHandler.hasInflightData(checkpointId, 
channelInfo);
}
 
@Override
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
index 07f6f7a..952af24 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java
@@ -22,6 +22,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
+import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.CheckpointBarrier;
 import 

[flink] 02/05: [FLINK-17260] Increase timeout for reading Kafka messages in StreamingKafkaITCase

2020-06-05 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 31c3a15731e91d01fb033fe5a8f8173e3ba0cb38
Author: Aljoscha Krettek 
AuthorDate: Thu Jun 4 10:33:24 2020 +0200

[FLINK-17260] Increase timeout for reading Kafka messages in 
StreamingKafkaITCase
---
 .../org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
index a651d12..5dc1137 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
@@ -278,7 +278,7 @@ public class LocalStandaloneKafkaResource implements 
KafkaResource {
.setStdoutProcessor(messages::add)
.runNonBlocking()) {
 
-   final Deadline deadline = 
Deadline.fromNow(Duration.ofSeconds(30));
+   final Deadline deadline = 
Deadline.fromNow(Duration.ofSeconds(120));
while (deadline.hasTimeLeft() && messages.size() < 
expectedNumMessages) {
try {
LOG.info("Waiting for messages. 
Received {}/{}.", messages.size(),



[flink] 01/05: [FLINK-17260] Make number of expected messages explicit in StreamingKafkaITCase

2020-06-05 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6505fe43d1b0cc3cf8543caf3fbdb1deae9697c5
Author: Aljoscha Krettek 
AuthorDate: Thu Jun 4 10:16:06 2020 +0200

[FLINK-17260] Make number of expected messages explicit in 
StreamingKafkaITCase

Before, it could happen that we time out and return early, which would
lead to a test failure. Now, we would fail at the source of the problem.
---
 .../org/apache/flink/tests/util/kafka/KafkaResource.java  |  7 ---
 .../tests/util/kafka/LocalStandaloneKafkaResource.java| 15 ++-
 2 files changed, 14 insertions(+), 8 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java
index 679d6c4..0157ad2 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java
@@ -63,15 +63,16 @@ public interface KafkaResource extends ExternalResource {
InetSocketAddress getZookeeperAddress();
 
/**
-* Reads up to {@code maxNumMessages} from the given topic.
+* Reads {@code expectedNumMessages} from the given topic. If we can't 
read the expected number
+* of messages we throw an exception.
 *
-* @param maxNumMessages maximum number of messages that should be read
+* @param expectedNumMessages expected number of messages that should 
be read
 * @param groupId group id to identify consumer
 * @param topic topic name
 * @return read messages
 * @throws IOException
 */
-   List readMessage(int maxNumMessages, String groupId, String 
topic) throws IOException;
+   List readMessage(int expectedNumMessages, String groupId, 
String topic) throws IOException;
 
/**
 * Modifies the number of partitions for the given topic.
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
index 405690f..a651d12 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
@@ -260,8 +260,9 @@ public class LocalStandaloneKafkaResource implements 
KafkaResource {
}
 
@Override
-   public List readMessage(int maxNumMessages, String groupId, 
String topic) throws IOException {
-   final List messages = Collections.synchronizedList(new 
ArrayList<>(maxNumMessages));
+   public List readMessage(int expectedNumMessages, String 
groupId, String topic) throws IOException {
+   final List messages = Collections.synchronizedList(new 
ArrayList<>(
+   expectedNumMessages));
 
try (final AutoClosableProcess kafka = AutoClosableProcess
.create(kafkaDir.resolve(Paths.get("bin", 
"kafka-console-consumer.sh")).toString(),
@@ -269,7 +270,7 @@ public class LocalStandaloneKafkaResource implements 
KafkaResource {
KAFKA_ADDRESS,
"--from-beginning",
"--max-messages",
-   String.valueOf(maxNumMessages),
+   String.valueOf(expectedNumMessages),
"--topic",
topic,
"--consumer-property",
@@ -278,15 +279,19 @@ public class LocalStandaloneKafkaResource implements 
KafkaResource {
.runNonBlocking()) {
 
final Deadline deadline = 
Deadline.fromNow(Duration.ofSeconds(30));
-   while (deadline.hasTimeLeft() && messages.size() < 
maxNumMessages) {
+   while (deadline.hasTimeLeft() && messages.size() < 
expectedNumMessages) {
try {
-   LOG.info("Waiting for messages. 
Received {}/{}.", messages.size(), maxNumMessages);
+   LOG.info("Waiting for messages. 
Received {}/{}.", messages.size(),
+   expectedNumMessages);

[flink] 04/05: [FLINK-18020] Make topic names unique in SQLClientKafkaITCase to prevent clashes

2020-06-05 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a03c8f13f735bf5b8d120fb7aa95587574b6dfd6
Author: Aljoscha Krettek 
AuthorDate: Thu Jun 4 15:20:19 2020 +0200

[FLINK-18020] Make topic names unique in SQLClientKafkaITCase to prevent 
clashes

Duplicate topic names and leftover data could be a potential source of 
instabilities.
---
 .../org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java| 7 +--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
index bf7358d..fabefc1 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
@@ -58,6 +58,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
 import static org.junit.Assert.assertThat;
@@ -92,6 +93,7 @@ public class SQLClientKafkaITCase extends TestLogger {
@Rule
public final TemporaryFolder tmp = new TemporaryFolder();
 
+   private final String kafkaVersion;
private final String kafkaSQLVersion;
private Path result;
private Path sqlClientSessionConf;
@@ -107,6 +109,7 @@ public class SQLClientKafkaITCase extends TestLogger {
 
public SQLClientKafkaITCase(String kafkaVersion, String 
kafkaSQLVersion, String kafkaSQLJarPattern) {
this.kafka = KafkaResource.get(kafkaVersion);
+   this.kafkaVersion = kafkaVersion;
this.kafkaSQLVersion = kafkaSQLVersion;
 
this.sqlConnectorKafkaJar = 
TestUtils.getResourceJar(kafkaSQLJarPattern);
@@ -129,8 +132,8 @@ public class SQLClientKafkaITCase extends TestLogger {
public void testKafka() throws Exception {
try (ClusterController clusterController = 
flink.startCluster(2)) {
// Create topic and send message
-   String testJsonTopic = "test-json";
-   String testAvroTopic = "test-avro";
+   String testJsonTopic = "test-json-" + kafkaVersion + 
"-" + UUID.randomUUID().toString();
+   String testAvroTopic = "test-avro-" + kafkaVersion + 
"-" + UUID.randomUUID().toString();
kafka.createTopic(1, 1, testJsonTopic);
String[] messages = new String[]{
"{\"timestamp\": 
\"2018-03-12T08:00:00Z\", \"user\": \"Alice\", \"event\": { \"type\": 
\"WARNING\", \"message\": \"This is a warning.\"}}",



[flink] 03/05: [FLINK-17260] Make topic names unique in StreamingKafkaITCase to prevent clashes

2020-06-05 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e4034ac296d4e8b43af570d3436cd2dec33d9ef1
Author: Aljoscha Krettek 
AuthorDate: Thu Jun 4 10:34:16 2020 +0200

[FLINK-17260] Make topic names unique in StreamingKafkaITCase to prevent 
clashes

Duplicate topic names and leftover data could be a potential source of
instabilities.
---
 .../org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java   | 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
index 4f62839..5e64159 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
@@ -43,6 +43,7 @@ import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 /**
@@ -65,6 +66,8 @@ public class StreamingKafkaITCase extends TestLogger {
 
private final Path kafkaExampleJar;
 
+   private final String kafkaVersion;
+
@Rule
public final KafkaResource kafka;
 
@@ -81,14 +84,15 @@ public class StreamingKafkaITCase extends TestLogger {
public StreamingKafkaITCase(final String kafkaExampleJarPattern, final 
String kafkaVersion) {
this.kafkaExampleJar = 
TestUtils.getResourceJar(kafkaExampleJarPattern);
this.kafka = KafkaResource.get(kafkaVersion);
+   this.kafkaVersion = kafkaVersion;
}
 
@Test
public void testKafka() throws Exception {
try (final ClusterController clusterController = 
flink.startCluster(1)) {
 
-   final String inputTopic = "test-input";
-   final String outputTopic = "test-output";
+   final String inputTopic = "test-input-" + kafkaVersion 
+ "-" + UUID.randomUUID().toString();
+   final String outputTopic = "test-output" + kafkaVersion 
+ "-" + UUID.randomUUID().toString();
 
// create the required topics
kafka.createTopic(1, 1, inputTopic);



[flink] 05/05: [FLINK-18020] Increase timeout in SQLClientKafkaITCase

2020-06-05 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cc59b3a7c4853be80a9d2868ed9e7d6c09966edc
Author: Aljoscha Krettek 
AuthorDate: Thu Jun 4 15:21:06 2020 +0200

[FLINK-18020] Increase timeout in SQLClientKafkaITCase
---
 .../apache/flink/tests/util/kafka/SQLClientKafkaITCase.java| 10 ++
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
index fabefc1..f087e31 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.tests.util.kafka;
 
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.tests.util.cache.DownloadCache;
 import org.apache.flink.tests.util.categories.TravisGroup1;
@@ -52,6 +53,7 @@ import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -234,8 +236,8 @@ public class SQLClientKafkaITCase extends TestLogger {
 
private void checkCsvResultFile() throws Exception {
boolean success = false;
-   long maxRetries = 10, duration = 5000L;
-   for (int i = 0; i < maxRetries; i++) {
+   final Deadline deadline = 
Deadline.fromNow(Duration.ofSeconds(120));
+   while (!success && deadline.hasTimeLeft()) {
if (Files.exists(result)) {
byte[] bytes = Files.readAllBytes(result);
String[] lines = new String(bytes, 
Charsets.UTF_8).split("\n");
@@ -255,8 +257,8 @@ public class SQLClientKafkaITCase extends TestLogger {
} else {
LOG.info("The target CSV {} does not exist 
now", result);
}
-   Thread.sleep(duration);
+   Thread.sleep(500);
}
-   Assert.assertTrue("Timeout(" + (maxRetries * duration) + " sec) 
to read the correct CSV results.", success);
+   Assert.assertTrue("Did not get expected results before 
timeout.", success);
}
 }



[flink] branch release-1.11 updated (39e3d6a -> cc59b3a)

2020-06-05 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 39e3d6a  [FLINK-18048] Fix --host option for standalone application 
cluster
 new 6505fe4  [FLINK-17260] Make number of expected messages explicit in 
StreamingKafkaITCase
 new 31c3a15  [FLINK-17260] Increase timeout for reading Kafka messages in 
StreamingKafkaITCase
 new e4034ac  [FLINK-17260] Make topic names unique in StreamingKafkaITCase 
to prevent clashes
 new a03c8f1  [FLINK-18020] Make topic names unique in SQLClientKafkaITCase 
to prevent clashes
 new cc59b3a  [FLINK-18020] Increase timeout in SQLClientKafkaITCase

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/tests/util/kafka/KafkaResource.java|  7 ---
 .../tests/util/kafka/LocalStandaloneKafkaResource.java  | 17 +++--
 .../flink/tests/util/kafka/SQLClientKafkaITCase.java| 17 +++--
 .../flink/tests/util/kafka/StreamingKafkaITCase.java|  8 ++--
 4 files changed, 32 insertions(+), 17 deletions(-)



[flink] 03/05: [FLINK-17260] Make topic names unique in StreamingKafkaITCase to prevent clashes

2020-06-05 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e4034ac296d4e8b43af570d3436cd2dec33d9ef1
Author: Aljoscha Krettek 
AuthorDate: Thu Jun 4 10:34:16 2020 +0200

[FLINK-17260] Make topic names unique in StreamingKafkaITCase to prevent 
clashes

Duplicate topic names and leftover data could be a potential source of
instabilities.
---
 .../org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java   | 8 ++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
index 4f62839..5e64159 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java
@@ -43,6 +43,7 @@ import java.nio.file.Path;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
+import java.util.UUID;
 import java.util.stream.Collectors;
 
 /**
@@ -65,6 +66,8 @@ public class StreamingKafkaITCase extends TestLogger {
 
private final Path kafkaExampleJar;
 
+   private final String kafkaVersion;
+
@Rule
public final KafkaResource kafka;
 
@@ -81,14 +84,15 @@ public class StreamingKafkaITCase extends TestLogger {
public StreamingKafkaITCase(final String kafkaExampleJarPattern, final 
String kafkaVersion) {
this.kafkaExampleJar = 
TestUtils.getResourceJar(kafkaExampleJarPattern);
this.kafka = KafkaResource.get(kafkaVersion);
+   this.kafkaVersion = kafkaVersion;
}
 
@Test
public void testKafka() throws Exception {
try (final ClusterController clusterController = 
flink.startCluster(1)) {
 
-   final String inputTopic = "test-input";
-   final String outputTopic = "test-output";
+   final String inputTopic = "test-input-" + kafkaVersion 
+ "-" + UUID.randomUUID().toString();
+   final String outputTopic = "test-output" + kafkaVersion 
+ "-" + UUID.randomUUID().toString();
 
// create the required topics
kafka.createTopic(1, 1, inputTopic);



[flink] 01/05: [FLINK-17260] Make number of expected messages explicit in StreamingKafkaITCase

2020-06-05 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6505fe43d1b0cc3cf8543caf3fbdb1deae9697c5
Author: Aljoscha Krettek 
AuthorDate: Thu Jun 4 10:16:06 2020 +0200

[FLINK-17260] Make number of expected messages explicit in 
StreamingKafkaITCase

Before, it could happen that we time out and return early, which would
lead to a test failure. Now, we would fail at the source of the problem.
---
 .../org/apache/flink/tests/util/kafka/KafkaResource.java  |  7 ---
 .../tests/util/kafka/LocalStandaloneKafkaResource.java| 15 ++-
 2 files changed, 14 insertions(+), 8 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java
index 679d6c4..0157ad2 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java
@@ -63,15 +63,16 @@ public interface KafkaResource extends ExternalResource {
InetSocketAddress getZookeeperAddress();
 
/**
-* Reads up to {@code maxNumMessages} from the given topic.
+* Reads {@code expectedNumMessages} from the given topic. If we can't 
read the expected number
+* of messages we throw an exception.
 *
-* @param maxNumMessages maximum number of messages that should be read
+* @param expectedNumMessages expected number of messages that should 
be read
 * @param groupId group id to identify consumer
 * @param topic topic name
 * @return read messages
 * @throws IOException
 */
-   List readMessage(int maxNumMessages, String groupId, String 
topic) throws IOException;
+   List readMessage(int expectedNumMessages, String groupId, 
String topic) throws IOException;
 
/**
 * Modifies the number of partitions for the given topic.
diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
index 405690f..a651d12 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
@@ -260,8 +260,9 @@ public class LocalStandaloneKafkaResource implements 
KafkaResource {
}
 
@Override
-   public List readMessage(int maxNumMessages, String groupId, 
String topic) throws IOException {
-   final List messages = Collections.synchronizedList(new 
ArrayList<>(maxNumMessages));
+   public List readMessage(int expectedNumMessages, String 
groupId, String topic) throws IOException {
+   final List messages = Collections.synchronizedList(new 
ArrayList<>(
+   expectedNumMessages));
 
try (final AutoClosableProcess kafka = AutoClosableProcess
.create(kafkaDir.resolve(Paths.get("bin", 
"kafka-console-consumer.sh")).toString(),
@@ -269,7 +270,7 @@ public class LocalStandaloneKafkaResource implements 
KafkaResource {
KAFKA_ADDRESS,
"--from-beginning",
"--max-messages",
-   String.valueOf(maxNumMessages),
+   String.valueOf(expectedNumMessages),
"--topic",
topic,
"--consumer-property",
@@ -278,15 +279,19 @@ public class LocalStandaloneKafkaResource implements 
KafkaResource {
.runNonBlocking()) {
 
final Deadline deadline = 
Deadline.fromNow(Duration.ofSeconds(30));
-   while (deadline.hasTimeLeft() && messages.size() < 
maxNumMessages) {
+   while (deadline.hasTimeLeft() && messages.size() < 
expectedNumMessages) {
try {
-   LOG.info("Waiting for messages. 
Received {}/{}.", messages.size(), maxNumMessages);
+   LOG.info("Waiting for messages. 
Received {}/{}.", messages.size(),
+   expectedNumMessages);

[flink] 05/05: [FLINK-18020] Increase timeout in SQLClientKafkaITCase

2020-06-05 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cc59b3a7c4853be80a9d2868ed9e7d6c09966edc
Author: Aljoscha Krettek 
AuthorDate: Thu Jun 4 15:21:06 2020 +0200

[FLINK-18020] Increase timeout in SQLClientKafkaITCase
---
 .../apache/flink/tests/util/kafka/SQLClientKafkaITCase.java| 10 ++
 1 file changed, 6 insertions(+), 4 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
index fabefc1..f087e31 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.tests.util.kafka;
 
+import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.tests.util.TestUtils;
 import org.apache.flink.tests.util.cache.DownloadCache;
 import org.apache.flink.tests.util.categories.TravisGroup1;
@@ -52,6 +53,7 @@ import java.net.URL;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
+import java.time.Duration;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
@@ -234,8 +236,8 @@ public class SQLClientKafkaITCase extends TestLogger {
 
private void checkCsvResultFile() throws Exception {
boolean success = false;
-   long maxRetries = 10, duration = 5000L;
-   for (int i = 0; i < maxRetries; i++) {
+   final Deadline deadline = 
Deadline.fromNow(Duration.ofSeconds(120));
+   while (!success && deadline.hasTimeLeft()) {
if (Files.exists(result)) {
byte[] bytes = Files.readAllBytes(result);
String[] lines = new String(bytes, 
Charsets.UTF_8).split("\n");
@@ -255,8 +257,8 @@ public class SQLClientKafkaITCase extends TestLogger {
} else {
LOG.info("The target CSV {} does not exist 
now", result);
}
-   Thread.sleep(duration);
+   Thread.sleep(500);
}
-   Assert.assertTrue("Timeout(" + (maxRetries * duration) + " sec) 
to read the correct CSV results.", success);
+   Assert.assertTrue("Did not get expected results before 
timeout.", success);
}
 }



[flink] 04/05: [FLINK-18020] Make topic names unique in SQLClientKafkaITCase to prevent clashes

2020-06-05 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a03c8f13f735bf5b8d120fb7aa95587574b6dfd6
Author: Aljoscha Krettek 
AuthorDate: Thu Jun 4 15:20:19 2020 +0200

[FLINK-18020] Make topic names unique in SQLClientKafkaITCase to prevent 
clashes

Duplicate topic names and leftover data could be a potential source of 
instabilities.
---
 .../org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java| 7 +--
 1 file changed, 5 insertions(+), 2 deletions(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
index bf7358d..fabefc1 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java
@@ -58,6 +58,7 @@ import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.UUID;
 
 import static org.hamcrest.Matchers.arrayContainingInAnyOrder;
 import static org.junit.Assert.assertThat;
@@ -92,6 +93,7 @@ public class SQLClientKafkaITCase extends TestLogger {
@Rule
public final TemporaryFolder tmp = new TemporaryFolder();
 
+   private final String kafkaVersion;
private final String kafkaSQLVersion;
private Path result;
private Path sqlClientSessionConf;
@@ -107,6 +109,7 @@ public class SQLClientKafkaITCase extends TestLogger {
 
public SQLClientKafkaITCase(String kafkaVersion, String 
kafkaSQLVersion, String kafkaSQLJarPattern) {
this.kafka = KafkaResource.get(kafkaVersion);
+   this.kafkaVersion = kafkaVersion;
this.kafkaSQLVersion = kafkaSQLVersion;
 
this.sqlConnectorKafkaJar = 
TestUtils.getResourceJar(kafkaSQLJarPattern);
@@ -129,8 +132,8 @@ public class SQLClientKafkaITCase extends TestLogger {
public void testKafka() throws Exception {
try (ClusterController clusterController = 
flink.startCluster(2)) {
// Create topic and send message
-   String testJsonTopic = "test-json";
-   String testAvroTopic = "test-avro";
+   String testJsonTopic = "test-json-" + kafkaVersion + 
"-" + UUID.randomUUID().toString();
+   String testAvroTopic = "test-avro-" + kafkaVersion + 
"-" + UUID.randomUUID().toString();
kafka.createTopic(1, 1, testJsonTopic);
String[] messages = new String[]{
"{\"timestamp\": 
\"2018-03-12T08:00:00Z\", \"user\": \"Alice\", \"event\": { \"type\": 
\"WARNING\", \"message\": \"This is a warning.\"}}",



[flink] branch release-1.11 updated (39e3d6a -> cc59b3a)

2020-06-05 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 39e3d6a  [FLINK-18048] Fix --host option for standalone application 
cluster
 new 6505fe4  [FLINK-17260] Make number of expected messages explicit in 
StreamingKafkaITCase
 new 31c3a15  [FLINK-17260] Increase timeout for reading Kafka messages in 
StreamingKafkaITCase
 new e4034ac  [FLINK-17260] Make topic names unique in StreamingKafkaITCase 
to prevent clashes
 new a03c8f1  [FLINK-18020] Make topic names unique in SQLClientKafkaITCase 
to prevent clashes
 new cc59b3a  [FLINK-18020] Increase timeout in SQLClientKafkaITCase

The 5 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../apache/flink/tests/util/kafka/KafkaResource.java|  7 ---
 .../tests/util/kafka/LocalStandaloneKafkaResource.java  | 17 +++--
 .../flink/tests/util/kafka/SQLClientKafkaITCase.java| 17 +++--
 .../flink/tests/util/kafka/StreamingKafkaITCase.java|  8 ++--
 4 files changed, 32 insertions(+), 17 deletions(-)



[flink] 02/05: [FLINK-17260] Increase timeout for reading Kafka messages in StreamingKafkaITCase

2020-06-05 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 31c3a15731e91d01fb033fe5a8f8173e3ba0cb38
Author: Aljoscha Krettek 
AuthorDate: Thu Jun 4 10:33:24 2020 +0200

[FLINK-17260] Increase timeout for reading Kafka messages in 
StreamingKafkaITCase
---
 .../org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
index a651d12..5dc1137 100644
--- 
a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
+++ 
b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java
@@ -278,7 +278,7 @@ public class LocalStandaloneKafkaResource implements 
KafkaResource {
.setStdoutProcessor(messages::add)
.runNonBlocking()) {
 
-   final Deadline deadline = 
Deadline.fromNow(Duration.ofSeconds(30));
+   final Deadline deadline = 
Deadline.fromNow(Duration.ofSeconds(120));
while (deadline.hasTimeLeft() && messages.size() < 
expectedNumMessages) {
try {
LOG.info("Waiting for messages. 
Received {}/{}.", messages.size(),



[flink] branch master updated (5834373 -> 971e7b2)

2020-06-05 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 5834373  [FLINK-18048] Fix --host option for standalone application 
cluster
 add c389ad0  [FLINK-17260] Make number of expected messages explicit in 
StreamingKafkaITCase
 add 455fb24  [FLINK-17260] Increase timeout for reading Kafka messages in 
StreamingKafkaITCase
 add ce7b41e  [FLINK-17260] Make topic names unique in StreamingKafkaITCase 
to prevent clashes
 add f6bbff4  [FLINK-18020] Make topic names unique in SQLClientKafkaITCase 
to prevent clashes
 add 971e7b2  [FLINK-18020] Increase timeout in SQLClientKafkaITCase

No new revisions were added by this update.

Summary of changes:
 .../apache/flink/tests/util/kafka/KafkaResource.java|  7 ---
 .../tests/util/kafka/LocalStandaloneKafkaResource.java  | 17 +++--
 .../flink/tests/util/kafka/SQLClientKafkaITCase.java| 17 +++--
 .../flink/tests/util/kafka/StreamingKafkaITCase.java|  8 ++--
 4 files changed, 32 insertions(+), 17 deletions(-)



[flink] branch master updated (5834373 -> 971e7b2)

2020-06-05 Thread aljoscha
This is an automated email from the ASF dual-hosted git repository.

aljoscha pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 5834373  [FLINK-18048] Fix --host option for standalone application 
cluster
 add c389ad0  [FLINK-17260] Make number of expected messages explicit in 
StreamingKafkaITCase
 add 455fb24  [FLINK-17260] Increase timeout for reading Kafka messages in 
StreamingKafkaITCase
 add ce7b41e  [FLINK-17260] Make topic names unique in StreamingKafkaITCase 
to prevent clashes
 add f6bbff4  [FLINK-18020] Make topic names unique in SQLClientKafkaITCase 
to prevent clashes
 add 971e7b2  [FLINK-18020] Increase timeout in SQLClientKafkaITCase

No new revisions were added by this update.

Summary of changes:
 .../apache/flink/tests/util/kafka/KafkaResource.java|  7 ---
 .../tests/util/kafka/LocalStandaloneKafkaResource.java  | 17 +++--
 .../flink/tests/util/kafka/SQLClientKafkaITCase.java| 17 +++--
 .../flink/tests/util/kafka/StreamingKafkaITCase.java|  8 ++--
 4 files changed, 32 insertions(+), 17 deletions(-)



[flink] branch release-1.10 updated: [FLINK-18048] Fix --host option for standalone job cluster

2020-06-05 Thread kkloudas
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.10 by this push:
 new 855f799  [FLINK-18048] Fix --host option for standalone job cluster
855f799 is described below

commit 855f799d9919348619f17cfcb4d6544dc02afdfb
Author: wangyang0918 
AuthorDate: Mon Jun 1 19:40:14 2020 +0800

[FLINK-18048] Fix --host option for standalone job cluster

This closes #12495.
---
 .../StandaloneJobClusterConfigurationParserFactory.java   | 1 +
 .../StandaloneJobClusterConfigurationParserFactoryTest.java   | 8 
 2 files changed, 9 insertions(+)

diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
index a99e277..ea03313 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java
@@ -68,6 +68,7 @@ public class StandaloneJobClusterConfigurationParserFactory 
implements ParserRes
options.addOption(JOB_CLASS_NAME_OPTION);
options.addOption(JOB_ID_OPTION);
options.addOption(DYNAMIC_PROPERTY_OPTION);
+   options.addOption(HOST_OPTION);
options.addOption(CliFrontendParser.SAVEPOINT_PATH_OPTION);

options.addOption(CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
 
diff --git 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
index 4a72d3a..ad1cdb5 100644
--- 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
+++ 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java
@@ -178,4 +178,12 @@ public class 
StandaloneJobClusterConfigurationParserFactoryTest extends TestLogg
assertThat(savepointRestoreSettings.allowNonRestoredState(), 
is(true));
}
 
+   @Test
+   public void testHostOption() throws FlinkParseException {
+   final String hostName = "user-specified-hostname";
+   final String[] args = {"--configDir", confDirPath, 
"--job-classname", "foobar", "--host", hostName};
+   final StandaloneJobClusterConfiguration 
applicationClusterConfiguration = commandLineParser.parse(args);
+   assertThat(applicationClusterConfiguration.getHostname(), 
is(hostName));
+   }
+
 }



[flink] branch release-1.11 updated: [FLINK-18048] Fix --host option for standalone application cluster

2020-06-05 Thread kkloudas
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 39e3d6a  [FLINK-18048] Fix --host option for standalone application 
cluster
39e3d6a is described below

commit 39e3d6ad6ce25bc94a9515362a96c229f3d0c590
Author: wangyang0918 
AuthorDate: Mon Jun 1 17:30:12 2020 +0800

[FLINK-18048] Fix --host option for standalone application cluster

This closes #12426.
---
 .../StandaloneApplicationClusterConfigurationParserFactory.java   | 1 +
 ...tandaloneApplicationClusterConfigurationParserFactoryTest.java | 8 
 2 files changed, 9 insertions(+)

diff --git 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java
 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java
index cc120c0..6c3bfb3 100644
--- 
a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java
+++ 
b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java
@@ -68,6 +68,7 @@ public class 
StandaloneApplicationClusterConfigurationParserFactory implements P
options.addOption(JOB_CLASS_NAME_OPTION);
options.addOption(JOB_ID_OPTION);
options.addOption(DYNAMIC_PROPERTY_OPTION);
+   options.addOption(HOST_OPTION);
options.addOption(CliFrontendParser.SAVEPOINT_PATH_OPTION);

options.addOption(CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION);
 
diff --git 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactoryTest.java
 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactoryTest.java
index a12de88..9c7e94c 100644
--- 
a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactoryTest.java
+++ 
b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactoryTest.java
@@ -233,4 +233,12 @@ public class 
StandaloneApplicationClusterConfigurationParserFactoryTest extends
assertThat(savepointRestoreSettings.allowNonRestoredState(), 
is(true));
}
 
+   @Test
+   public void testHostOption() throws FlinkParseException {
+   final String hostName = "user-specified-hostname";
+   final String[] args = {"--configDir", confDirPath, 
"--job-classname", "foobar", "--host", hostName};
+   final StandaloneApplicationClusterConfiguration 
applicationClusterConfiguration = commandLineParser.parse(args);
+   assertThat(applicationClusterConfiguration.getHostname(), 
is(hostName));
+   }
+
 }



[flink] branch master updated (61e6f70 -> 5834373)

2020-06-05 Thread kkloudas
This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 61e6f70  [FLINK-18076][table sql / client] Use correct classloader 
when parsing queries
 add 5834373  [FLINK-18048] Fix --host option for standalone application 
cluster

No new revisions were added by this update.

Summary of changes:
 .../StandaloneApplicationClusterConfigurationParserFactory.java   | 1 +
 ...tandaloneApplicationClusterConfigurationParserFactoryTest.java | 8 
 2 files changed, 9 insertions(+)



[flink] branch master updated (b39ef6c -> 61e6f70)

2020-06-05 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from b39ef6c  [FLINK-15339][table][docs] Correct the terminology of 
"Time-windowed Join" to "Interval Join" in Table API & SQL
 add 61e6f70  [FLINK-18076][table sql / client] Use correct classloader 
when parsing queries

No new revisions were added by this update.

Summary of changes:
 .../table/client/gateway/local/LocalExecutor.java  | 15 ++-
 .../table/client/gateway/local/DependencyTest.java | 50 --
 2 files changed, 51 insertions(+), 14 deletions(-)



[flink] branch release-1.11 updated: [FLINK-18076][table sql / client] Use correct classloader when parsing queries

2020-06-05 Thread dwysakowicz
This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new e7902bb  [FLINK-18076][table sql / client] Use correct classloader 
when parsing queries
e7902bb is described below

commit e7902bb4d1329833870ee53c782c6431cfc8cb80
Author: Leonard Xu 
AuthorDate: Thu Jun 4 23:53:07 2020 +0800

[FLINK-18076][table sql / client] Use correct classloader when parsing 
queries

This closes #12475
---
 .../table/client/gateway/local/LocalExecutor.java  | 15 ++-
 .../table/client/gateway/local/DependencyTest.java | 50 --
 2 files changed, 51 insertions(+), 14 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index fd78712..604734e 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -41,6 +41,7 @@ import org.apache.flink.table.api.TableEnvironment;
 import org.apache.flink.table.api.TableResult;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.internal.TableEnvironmentInternal;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.client.SqlClientException;
 import org.apache.flink.table.client.config.Environment;
@@ -56,6 +57,7 @@ import 
org.apache.flink.table.client.gateway.local.result.ChangelogResult;
 import org.apache.flink.table.client.gateway.local.result.DynamicResult;
 import org.apache.flink.table.client.gateway.local.result.MaterializedResult;
 import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.logical.utils.LogicalTypeUtils;
 import org.apache.flink.table.types.utils.DataTypeUtils;
@@ -460,7 +462,18 @@ public class LocalExecutor implements Executor {
public Parser getSqlParser(String sessionId) {
final ExecutionContext context = 
getExecutionContext(sessionId);
final TableEnvironment tableEnv = context.getTableEnvironment();
-   return ((TableEnvironmentInternal) tableEnv).getParser();
+   final Parser parser = ((TableEnvironmentInternal) 
tableEnv).getParser();
+   return new Parser() {
+   @Override
+   public List parse(String statement) {
+   return context.wrapClassLoader(() -> 
parser.parse(statement));
+   }
+
+   @Override
+   public UnresolvedIdentifier parseIdentifier(String 
identifier) {
+   return context.wrapClassLoader(() -> 
parser.parseIdentifier(identifier));
+   }
+   };
}
 
@Override
diff --git 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
index 33633f3..87f750d 100644
--- 
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
+++ 
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java
@@ -44,10 +44,12 @@ import org.apache.flink.table.client.gateway.SessionContext;
 import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil;
 import org.apache.flink.table.client.gateway.utils.TestTableSinkFactoryBase;
 import org.apache.flink.table.client.gateway.utils.TestTableSourceFactoryBase;
+import org.apache.flink.table.delegation.Parser;
 import org.apache.flink.table.descriptors.DescriptorProperties;
 import org.apache.flink.table.factories.CatalogFactory;
 import org.apache.flink.table.factories.ModuleFactory;
 import org.apache.flink.table.module.Module;
+import org.apache.flink.table.operations.Operation;
 import org.apache.flink.table.types.DataType;
 
 import org.junit.Test;
@@ -65,6 +67,7 @@ import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATA
 import static 
org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE;
 import static 
org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE;
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * Dependency tests for {@link LocalExecutor}. Mainly for testing classloading 
of dependencies.
@@ 

[flink] branch release-1.11 updated (ec65e38 -> ce63fd1)

2020-06-05 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from ec65e38  [hotfix][docs] Remove HADOOP_CONF_DIR
 add ce63fd1  [FLINK-15339][table][docs] Correct the terminology of 
"Time-windowed Join" to "Interval Join" in Table API & SQL

No new revisions were added by this update.

Summary of changes:
 docs/dev/table/sql/queries.md  | 14 +--
 docs/dev/table/sql/queries.zh.md   | 14 +--
 docs/dev/table/streaming/joins.md  |  8 +++---
 docs/dev/table/streaming/joins.zh.md   |  8 +++---
 docs/dev/table/streaming/match_recognize.md|  4 +--
 docs/dev/table/streaming/match_recognize.zh.md |  4 +--
 docs/dev/table/tableApi.md | 18 +++---
 docs/dev/table/tableApi.zh.md  | 18 +++---
 .../plan/metadata/FlinkRelMdColumnUniqueness.scala |  2 +-
 .../metadata/FlinkRelMdModifiedMonotonicity.scala  |  2 +-
 .../plan/metadata/FlinkRelMdUniqueKeys.scala   |  2 +-
 ...ndowJoin.scala => StreamExecIntervalJoin.scala} | 29 +++---
 .../FlinkChangelogModeInferenceProgram.scala   |  8 +++---
 .../planner/plan/rules/FlinkStreamRuleSets.scala   |  2 +-
 ...Rule.scala => StreamExecIntervalJoinRule.scala} | 20 +++
 .../rules/physical/stream/StreamExecJoinRule.scala |  6 ++---
 .../stream/StreamExecTemporalJoinRule.scala|  4 +--
 ...WindowJoinUtil.scala => IntervalJoinUtil.scala} |  4 +--
 .../apache/flink/table/api/stream/ExplainTest.xml  |  8 +++---
 .../plan/stream/sql/MiniBatchIntervalInferTest.xml | 12 -
 .../{WindowJoinTest.xml => IntervalJoinTest.xml}   | 26 +--
 .../table/planner/plan/stream/table/JoinTest.xml   | 20 +++
 .../stream/sql/MiniBatchIntervalInferTest.scala|  2 +-
 ...WindowJoinTest.scala => IntervalJoinTest.scala} | 14 +--
 ...owJoinITCase.scala => IntervalJoinITCase.scala} |  8 +++---
 .../join/{ => interval}/EmitAwareCollector.java|  2 +-
 .../ProcTimeIntervalJoin.java} |  9 ---
 .../RowTimeIntervalJoin.java}  |  9 ---
 .../TimeIntervalJoin.java} | 20 ---
 .../ProcTimeIntervalJoinTest.java} | 15 +--
 .../RowTimeIntervalJoinTest.java}  | 22 
 .../TimeIntervalStreamJoinTestBase.java}   | 12 -
 32 files changed, 177 insertions(+), 169 deletions(-)
 rename 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/{StreamExecWindowJoin.scala
 => StreamExecIntervalJoin.scala} (92%)
 rename 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/{StreamExecWindowJoinRule.scala
 => StreamExecIntervalJoinRule.scala} (91%)
 rename 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/{WindowJoinUtil.scala
 => IntervalJoinUtil.scala} (99%)
 rename 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/{WindowJoinTest.xml
 => IntervalJoinTest.xml} (83%)
 rename 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/{WindowJoinTest.scala
 => IntervalJoinTest.scala} (97%)
 rename 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/{WindowJoinITCase.scala
 => IntervalJoinITCase.scala} (99%)
 rename 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/{
 => interval}/EmitAwareCollector.java (96%)
 rename 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/{ProcTimeBoundedStreamJoin.java
 => interval/ProcTimeIntervalJoin.java} (86%)
 rename 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/{RowTimeBoundedStreamJoin.java
 => interval/RowTimeIntervalJoin.java} (89%)
 rename 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/{TimeBoundedStreamJoin.java
 => interval/TimeIntervalJoin.java} (97%)
 rename 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/{ProcTimeBoundedStreamJoinTest.java
 => interval/ProcTimeIntervalJoinTest.java} (94%)
 rename 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/{RowTimeBoundedStreamJoinTest.java
 => interval/RowTimeIntervalJoinTest.java} (95%)
 rename 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/{TimeBoundedStreamJoinTestBase.java
 => interval/TimeIntervalStreamJoinTestBase.java} (88%)



[flink] branch master updated (b48d330 -> b39ef6c)

2020-06-05 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from b48d330  [hotfix][docs] Remove HADOOP_CONF_DIR
 add b39ef6c  [FLINK-15339][table][docs] Correct the terminology of 
"Time-windowed Join" to "Interval Join" in Table API & SQL

No new revisions were added by this update.

Summary of changes:
 docs/dev/table/sql/queries.md  | 14 +--
 docs/dev/table/sql/queries.zh.md   | 14 +--
 docs/dev/table/streaming/joins.md  |  8 +++---
 docs/dev/table/streaming/joins.zh.md   |  8 +++---
 docs/dev/table/streaming/match_recognize.md|  4 +--
 docs/dev/table/streaming/match_recognize.zh.md |  4 +--
 docs/dev/table/tableApi.md | 18 +++---
 docs/dev/table/tableApi.zh.md  | 18 +++---
 .../plan/metadata/FlinkRelMdColumnUniqueness.scala |  2 +-
 .../metadata/FlinkRelMdModifiedMonotonicity.scala  |  2 +-
 .../plan/metadata/FlinkRelMdUniqueKeys.scala   |  2 +-
 ...ndowJoin.scala => StreamExecIntervalJoin.scala} | 29 +++---
 .../FlinkChangelogModeInferenceProgram.scala   |  8 +++---
 .../planner/plan/rules/FlinkStreamRuleSets.scala   |  2 +-
 ...Rule.scala => StreamExecIntervalJoinRule.scala} | 20 +++
 .../rules/physical/stream/StreamExecJoinRule.scala |  6 ++---
 .../stream/StreamExecTemporalJoinRule.scala|  4 +--
 ...WindowJoinUtil.scala => IntervalJoinUtil.scala} |  4 +--
 .../apache/flink/table/api/stream/ExplainTest.xml  |  8 +++---
 .../plan/stream/sql/MiniBatchIntervalInferTest.xml | 12 -
 .../{WindowJoinTest.xml => IntervalJoinTest.xml}   | 26 +--
 .../table/planner/plan/stream/table/JoinTest.xml   | 20 +++
 .../stream/sql/MiniBatchIntervalInferTest.scala|  2 +-
 ...WindowJoinTest.scala => IntervalJoinTest.scala} | 14 +--
 ...owJoinITCase.scala => IntervalJoinITCase.scala} |  8 +++---
 .../join/{ => interval}/EmitAwareCollector.java|  2 +-
 .../ProcTimeIntervalJoin.java} |  9 ---
 .../RowTimeIntervalJoin.java}  |  9 ---
 .../TimeIntervalJoin.java} | 20 ---
 .../ProcTimeIntervalJoinTest.java} | 15 +--
 .../RowTimeIntervalJoinTest.java}  | 22 
 .../TimeIntervalStreamJoinTestBase.java}   | 12 -
 32 files changed, 177 insertions(+), 169 deletions(-)
 rename 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/{StreamExecWindowJoin.scala
 => StreamExecIntervalJoin.scala} (92%)
 rename 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/{StreamExecWindowJoinRule.scala
 => StreamExecIntervalJoinRule.scala} (91%)
 rename 
flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/{WindowJoinUtil.scala
 => IntervalJoinUtil.scala} (99%)
 rename 
flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/{WindowJoinTest.xml
 => IntervalJoinTest.xml} (83%)
 rename 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/{WindowJoinTest.scala
 => IntervalJoinTest.scala} (97%)
 rename 
flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/{WindowJoinITCase.scala
 => IntervalJoinITCase.scala} (99%)
 rename 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/{
 => interval}/EmitAwareCollector.java (96%)
 rename 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/{ProcTimeBoundedStreamJoin.java
 => interval/ProcTimeIntervalJoin.java} (86%)
 rename 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/{RowTimeBoundedStreamJoin.java
 => interval/RowTimeIntervalJoin.java} (89%)
 rename 
flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/{TimeBoundedStreamJoin.java
 => interval/TimeIntervalJoin.java} (97%)
 rename 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/{ProcTimeBoundedStreamJoinTest.java
 => interval/ProcTimeIntervalJoinTest.java} (94%)
 rename 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/{RowTimeBoundedStreamJoinTest.java
 => interval/RowTimeIntervalJoinTest.java} (95%)
 rename 
flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/{TimeBoundedStreamJoinTestBase.java
 => interval/TimeIntervalStreamJoinTestBase.java} (88%)



[flink] branch release-1.11 updated: [hotfix][docs] Remove HADOOP_CONF_DIR

2020-06-05 Thread rmetzger
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new ec65e38  [hotfix][docs] Remove HADOOP_CONF_DIR
ec65e38 is described below

commit ec65e38159cab82beb869a1f169cce977a12d159
Author: Robert Metzger 
AuthorDate: Thu Jun 4 21:13:12 2020 +0200

[hotfix][docs] Remove HADOOP_CONF_DIR

The Hadoop configuration page was quite confusing because it suggested that 
the user has to
set the HADOOP_CONF_DIR as well. All jars and configs can and should
be passed through HADOOP_CLASSPATH. Period.

This closes #12491
---
 docs/ops/deployment/hadoop.md| 21 +
 docs/ops/deployment/hadoop.zh.md | 21 +
 2 files changed, 10 insertions(+), 32 deletions(-)

diff --git a/docs/ops/deployment/hadoop.md b/docs/ops/deployment/hadoop.md
index 71914a6..7b5e5bc 100644
--- a/docs/ops/deployment/hadoop.md
+++ b/docs/ops/deployment/hadoop.md
@@ -32,7 +32,7 @@ under the License.
 In order to use Hadoop features (e.g., YARN, HDFS) it is necessary to provide 
Flink with the required Hadoop classes,
 as these are not bundled by default.
 
-This can be done by adding the Hadoop classpath to Flink through the 
`HADOOP_CLASSPATH` environment variable.
+The recommended approach is adding the Hadoop classpath to Flink through the 
`HADOOP_CLASSPATH` environment variable.
 
 Flink will use the environment variable `HADOOP_CLASSPATH` to augment the
 classpath that is used when starting Flink components such as the Client,
@@ -44,27 +44,16 @@ that are running Flink components.
 When running on YARN, this is usually not a problem because the components
 running inside YARN will be started with the Hadoop classpaths, but it can
 happen that the Hadoop dependencies must be in the classpath when submitting a
-job to YARN. For this, it's usually enough to do a
+job to YARN. For this, it's usually enough to run
 
 {% highlight bash %}
 export HADOOP_CLASSPATH=`hadoop classpath`
 {% endhighlight %}
 
-in the shell. Note that `hadoop` is the hadoop binary and that `classpath` is 
an argument that will make it print the configured Hadoop classpath.
+in the shell. Note that `hadoop` is the hadoop binary and that `classpath` is 
an argument that will make it print the configured Hadoop classpath. The 
classpath returned by `hadoop classpath` also includes the Hadoop configuration 
directories.
 
-Putting the Hadoop configuration in the same class path as the Hadoop 
libraries makes Flink pick up that configuration.
-
-## Referencing a Hadoop configuration
-
-You can reference a Hadoop configuration by setting the environment variable 
`HADOOP_CONF_DIR`.
-
-```sh
-HADOOP_CONF_DIR=/path/to/etc/hadoop
-```
-
-Referencing the HDFS configuration in the [Flink configuration]({{ 
site.baseurl }}/ops/config.html#hdfs) is deprecated.
-
-Another way to provide the Hadoop configuration is to have it on the class 
path of the Flink process, see more details above.
+If you are manually assembling the `HADOOP_CLASSPATH` variable, we recommend
+adding the Hadoop configuration directories as well.
 
 ## Running a job locally
 
diff --git a/docs/ops/deployment/hadoop.zh.md b/docs/ops/deployment/hadoop.zh.md
index f9cf3e1..c73a000 100644
--- a/docs/ops/deployment/hadoop.zh.md
+++ b/docs/ops/deployment/hadoop.zh.md
@@ -32,7 +32,7 @@ under the License.
 In order to use Hadoop features (e.g., YARN, HDFS) it is necessary to provide 
Flink with the required Hadoop classes,
 as these are not bundled by default.
 
-This can be done by adding the Hadoop classpath to Flink through the 
`HADOOP_CLASSPATH` environment variable.
+The recommended approach is adding the Hadoop classpath to Flink through the 
`HADOOP_CLASSPATH` environment variable.
 
 Flink will use the environment variable `HADOOP_CLASSPATH` to augment the
 classpath that is used when starting Flink components such as the Client,
@@ -44,27 +44,16 @@ that are running Flink components.
 When running on YARN, this is usually not a problem because the components
 running inside YARN will be started with the Hadoop classpaths, but it can
 happen that the Hadoop dependencies must be in the classpath when submitting a
-job to YARN. For this, it's usually enough to do a
+job to YARN. For this, it's usually enough to run
 
 {% highlight bash %}
 export HADOOP_CLASSPATH=`hadoop classpath`
 {% endhighlight %}
 
-in the shell. Note that `hadoop` is the hadoop binary and that `classpath` is 
an argument that will make it print the configured Hadoop classpath.
+in the shell. Note that `hadoop` is the hadoop binary and that `classpath` is 
an argument that will make it print the configured Hadoop classpath. The 
classpath returned by `hadoop classpath` also includes the Hadoop configuration 
directories.
 
-Putting the Hadoop configuration in the same class path 

[flink] branch master updated: [hotfix][docs] Remove HADOOP_CONF_DIR

2020-06-05 Thread rmetzger
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new b48d330  [hotfix][docs] Remove HADOOP_CONF_DIR
b48d330 is described below

commit b48d330a9f09b1a30077dc7666dd95d942013765
Author: Robert Metzger 
AuthorDate: Thu Jun 4 21:13:12 2020 +0200

[hotfix][docs] Remove HADOOP_CONF_DIR

The Hadoop configuration page was quite confusing because it suggested that 
the user has to
set the HADOOP_CONF_DIR as well. All jars and configs can and should
be passed through HADOOP_CLASSPATH. Period.

This closes #12491
---
 docs/ops/deployment/hadoop.md| 21 +
 docs/ops/deployment/hadoop.zh.md | 21 +
 2 files changed, 10 insertions(+), 32 deletions(-)

diff --git a/docs/ops/deployment/hadoop.md b/docs/ops/deployment/hadoop.md
index 71914a6..7b5e5bc 100644
--- a/docs/ops/deployment/hadoop.md
+++ b/docs/ops/deployment/hadoop.md
@@ -32,7 +32,7 @@ under the License.
 In order to use Hadoop features (e.g., YARN, HDFS) it is necessary to provide 
Flink with the required Hadoop classes,
 as these are not bundled by default.
 
-This can be done by adding the Hadoop classpath to Flink through the 
`HADOOP_CLASSPATH` environment variable.
+The recommended approach is adding the Hadoop classpath to Flink through the 
`HADOOP_CLASSPATH` environment variable.
 
 Flink will use the environment variable `HADOOP_CLASSPATH` to augment the
 classpath that is used when starting Flink components such as the Client,
@@ -44,27 +44,16 @@ that are running Flink components.
 When running on YARN, this is usually not a problem because the components
 running inside YARN will be started with the Hadoop classpaths, but it can
 happen that the Hadoop dependencies must be in the classpath when submitting a
-job to YARN. For this, it's usually enough to do a
+job to YARN. For this, it's usually enough to run
 
 {% highlight bash %}
 export HADOOP_CLASSPATH=`hadoop classpath`
 {% endhighlight %}
 
-in the shell. Note that `hadoop` is the hadoop binary and that `classpath` is 
an argument that will make it print the configured Hadoop classpath.
+in the shell. Note that `hadoop` is the hadoop binary and that `classpath` is 
an argument that will make it print the configured Hadoop classpath. The 
classpath returned by `hadoop classpath` also includes the Hadoop configuration 
directories.
 
-Putting the Hadoop configuration in the same class path as the Hadoop 
libraries makes Flink pick up that configuration.
-
-## Referencing a Hadoop configuration
-
-You can reference a Hadoop configuration by setting the environment variable 
`HADOOP_CONF_DIR`.
-
-```sh
-HADOOP_CONF_DIR=/path/to/etc/hadoop
-```
-
-Referencing the HDFS configuration in the [Flink configuration]({{ 
site.baseurl }}/ops/config.html#hdfs) is deprecated.
-
-Another way to provide the Hadoop configuration is to have it on the class 
path of the Flink process, see more details above.
+If you are manually assembling the `HADOOP_CLASSPATH` variable, we recommend
+adding the Hadoop configuration directories as well.
 
 ## Running a job locally
 
diff --git a/docs/ops/deployment/hadoop.zh.md b/docs/ops/deployment/hadoop.zh.md
index f9cf3e1..c73a000 100644
--- a/docs/ops/deployment/hadoop.zh.md
+++ b/docs/ops/deployment/hadoop.zh.md
@@ -32,7 +32,7 @@ under the License.
 In order to use Hadoop features (e.g., YARN, HDFS) it is necessary to provide 
Flink with the required Hadoop classes,
 as these are not bundled by default.
 
-This can be done by adding the Hadoop classpath to Flink through the 
`HADOOP_CLASSPATH` environment variable.
+The recommended approach is adding the Hadoop classpath to Flink through the 
`HADOOP_CLASSPATH` environment variable.
 
 Flink will use the environment variable `HADOOP_CLASSPATH` to augment the
 classpath that is used when starting Flink components such as the Client,
@@ -44,27 +44,16 @@ that are running Flink components.
 When running on YARN, this is usually not a problem because the components
 running inside YARN will be started with the Hadoop classpaths, but it can
 happen that the Hadoop dependencies must be in the classpath when submitting a
-job to YARN. For this, it's usually enough to do a
+job to YARN. For this, it's usually enough to run
 
 {% highlight bash %}
 export HADOOP_CLASSPATH=`hadoop classpath`
 {% endhighlight %}
 
-in the shell. Note that `hadoop` is the hadoop binary and that `classpath` is 
an argument that will make it print the configured Hadoop classpath.
+in the shell. Note that `hadoop` is the hadoop binary and that `classpath` is 
an argument that will make it print the configured Hadoop classpath. The 
classpath returned by `hadoop classpath` also includes the Hadoop configuration 
directories.
 
-Putting the Hadoop configuration in the same class path as the 

[flink] branch release-1.11 updated: [FLINK-17974][docs] Extend docker documentation

2020-06-05 Thread rmetzger
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new b20ea34  [FLINK-17974][docs] Extend docker documentation
b20ea34 is described below

commit b20ea34d3c10254a82e46875419cd98d2986085e
Author: Robert Metzger 
AuthorDate: Thu Jun 4 14:41:53 2020 +0200

[FLINK-17974][docs] Extend docker documentation

This closes #12490
---
 docs/ops/deployment/docker.md| 59 +-
 docs/ops/deployment/docker.zh.md | 61 ++--
 2 files changed, 91 insertions(+), 29 deletions(-)

diff --git a/docs/ops/deployment/docker.md b/docs/ops/deployment/docker.md
index a1775c3..2525185 100644
--- a/docs/ops/deployment/docker.md
+++ b/docs/ops/deployment/docker.md
@@ -46,12 +46,12 @@ For example, you can use the following aliases:
 * `flink:latest` → `flink:-scala_`
 * `flink:1.11` → `flink:1.11.-scala_2.11`
 
-Note Prio to Flink 1.5 version, Hadoop 
dependencies were always bundled with Flink.
+Note Prior to Flink 1.5 version, Hadoop 
dependencies were always bundled with Flink.
 You can see that certain tags include the version of Hadoop, e.g. (e.g. 
`-hadoop28`).
 Beginning with Flink 1.5, image tags that omit the Hadoop version correspond 
to Hadoop-free releases of Flink
 that do not include a bundled Hadoop distribution.
 
-## How to run Flink image
+## How to run a Flink image
 
 The Flink image contains a regular Flink distribution with its default 
configuration and a standard entry point script.
 You can run its entry point in the following modes:
@@ -70,21 +70,43 @@ and deploys *TaskManagers* on demand so that you do not 
have to do it manually.
 
 The next chapters describe how to start a single Flink Docker container for 
various purposes.
 
+Once you've started Flink on Docker, you can access the Flink Webfrontend on 
[localhost:8081](http://localhost:8081/#/overview) or submit jobs like this 
`./bin/flink run ./examples/streaming/TopSpeedWindowing.jar`.
+
+We recommend using [Docker 
Compose](docker.html#session-cluster-with-docker-compose) or [Docker 
Swarm](docker.html#session-cluster-with-docker-swarm) for deploying Flink as a 
Session Cluster to ease system configuration.
+
 ### Start a Session Cluster
 
 A *Flink Session cluster* can be used to run multiple jobs. Each job needs to 
be submitted to the cluster after it has been deployed.
-To deploy a *Flink Session cluster* with Docker, you need to start a *Flink 
Master* container:
+To deploy a *Flink Session cluster* with Docker, you need to start a *Flink 
Master* container. To enable communication between the containers, we first set 
a required Flink configuration property and create a network:
+```sh
+FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"
+docker network create flink-network
+```
+
+Then we launch the JobManager:
 
 ```sh
-docker run flink:{% if site.is_stable 
%}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest{% endif 
%} jobmanager
+docker run \
+--rm \
+--name=jobmanager \
+--network flink-network \
+-p 8081:8081 \
+--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
+flink:{% if site.is_stable 
%}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest{% endif 
%} jobmanager
 ```
 
 and one or more *TaskManager* containers:
 
 ```sh
-docker run flink:{% if site.is_stable 
%}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest{% endif 
%} taskmanager
+docker run \
+--rm \
+--name=taskmanager \
+--network flink-network \
+--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
+flink:{% if site.is_stable 
%}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest{% endif 
%} taskmanager
 ```
 
+
 ### Start a Job Cluster
 
 A *Flink Job cluster* is a dedicated cluster which runs a single job.
@@ -105,18 +127,26 @@ To make the **job artifacts available** locally in the 
container, you can
 the *Flink Master* and *TaskManagers*:
 
 ```sh
+FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager"
+docker network create flink-network
+
 docker run \
 --mount 
type=bind,src=/host/path/to/job/artifacts1,target=/opt/flink/usrlib/artifacts1 \
 --mount 
type=bind,src=/host/path/to/job/artifacts2,target=/opt/flink/usrlib/artifacts2 \
+--rm \
+--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \
+--name=jobmanager \
+--network flink-network \
 flink:{% if site.is_stable 
%}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest{% endif 
%} standalone-job \
 --job-classname com.job.ClassName \
---job-id  \
+[--job-id ] \
 [--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] \
 [job arguments]
 
 docker run \
 --mount 

[flink] branch master updated (d991b1a -> 13bae63)

2020-06-05 Thread rmetzger
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from d991b1a  [FLINK-17816][metrics] Schedule latency markers with fixed 
delay
 add 13bae63  [FLINK-17974][docs] Extend docker documentation

No new revisions were added by this update.

Summary of changes:
 docs/ops/deployment/docker.md| 59 +-
 docs/ops/deployment/docker.zh.md | 61 ++--
 2 files changed, 91 insertions(+), 29 deletions(-)



[flink-benchmarks] branch master updated: [FLINK-17287][github] Disable merge commit button

2020-06-05 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git


The following commit(s) were added to refs/heads/master by this push:
 new 8b44986  [FLINK-17287][github] Disable merge commit button
8b44986 is described below

commit 8b449865cf733dbb3c01e997fe44b1a5b6f82cdc
Author: Nico Kruber 
AuthorDate: Fri Jun 5 11:46:38 2020 +0200

[FLINK-17287][github] Disable merge commit button
---
 .asf.yaml | 5 +
 1 file changed, 5 insertions(+)

diff --git a/.asf.yaml b/.asf.yaml
new file mode 100644
index 000..930c896
--- /dev/null
+++ b/.asf.yaml
@@ -0,0 +1,5 @@
+github:
+  enabled_merge_buttons:
+squash: true
+merge: false
+rebase: true



[flink-benchmarks] branch master updated (c1db02d -> a37619d)

2020-06-05 Thread nkruber
This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git.


from c1db02d  [init] Add ASL 2.0 License
 new e16896a  Add skeleton for micro benchmarks twith first benchmark
 new 0134b6a  Add KeyByBenchmark
 new 72afb9b  Add TimeWindowBenchmark
 new a68dca7  Dump results to csv file
 new 0016349  Add benchmarks for different state backends
 new 469188b  make KeyByBenchmark#SumReduce a static inner class
 new e4e9024  Extract user functions
 new a7947c4  Improve codespeed
 new 0c33a3e  Extract StateBackendBenchmark
 new c03925f  Reduce number of parameters
 new 90ab919  Bump number of iterations to production ready values
 new e1267b6  Remove fork value from pom file
 new 2953086  Change units in benchmarks
 new 35aeff7  Extract common logic for IntLong benchmarks
 new d42afaf  Benchmark state using tumbling windows
 new df6a089  Use different number of operations per invocation in memory 
and rocks benchmarks
 new 621f73e  Make all benchmark inherite from BenchmarkBase
 new 8fd1ae4  Extract BenchmarkBase for non integration Flink benchmarks
 new b662f8e  Clean up pom.xml
 new ca39899  Update Flink to 1.5-SNAPSHOT
 new 58687db  Run NetworkBenchmarks from flink
 new aca8216  Ignore empty parameters in benchmark name generation
 new 313bfd7  add benchmarks for the keyBy() operation on tuples and arrays
 new 4a453df  Remove incorrect comment from pom
 new 062f96b  Add README.md
 new 9e6eee1  [hotfix] make KeyByBenchmarks#IncreasingBaseSource more 
generic, creating BaseSourceWithKeyRange
 new 8cd86ae  add SerializationFrameworkBenchmarks for Pojo, Tuple, Kryo, 
and Avro
 new 426ac4c  Update default JVM options and bump default Flink version to 
1.7
 new a1d2ebe  Add network 1000,1ms benchmark
 new a8bc679  cleanup pom.xml (#8)
 new da30375  Add broadcast network 100,100ms benchmark
 new 9a95e89  clarify licensing situation with respect to jmh
 new 84532bd  add SSL network benchmarks and change overall benchmark set
 new 2ddbb7a  Add general remarks to the README
 new 21b97fe  [FLINK-11986] [state backend, tests] Add micro benchmark for 
state operations
 new 8b71d9c  Add support to generate shaded benchmark package to allow run 
specific case in command line
 new 5d7aec8  [hotfix] Missing quotes in example maven command in README
 new 97ffd65  [hotfix] Exclude state benchmark from the Flink suit
 new c25d921  [hotfix] Fix NoClassDefFoundError when running network 
related benchmarks
 new 136c166  Run all benchmarks once on Travis CI
 new ca2d06b  Use maven profile to differentiate travis and jenkins run
 new cd16e99  Update the dependent flink version to 1.9-SNAPSHOT
 new 4372ba0  Add InputBenchmark
 new 70fb9bd  Explain naming convention in the README
 new 9c338a9  Add couple of details to the README and a note how to pick 
length of a new benchmark
 new b2c1aa2  Add TwoInputBenchmark
 new 75d7b6e  [FLINK-11877] Add an input idle benchmark for two input 
operators
 new caf4917  [hotfix] Fix typo in static variable
 new 3b55f59  [hotfix] Fix formatting of pom.xml
 new 9e3833a  [hotfix] Add missing dependencies
 new ab0015d  [FLINK-12818] Improve stability of twoInputMapSink benchmark
 new a4ae3ae  [hotfix] adapt StreamNetworkThroughputBenchmarkExecutor to 
latest changes in Flink
 new c2f6793  Delete root directory when teardown the benchmark
 new 15064ae  Let list state benchmark stable
 new d47b530  Update flink-tests artifact id
 new 6ba88de  Adding benchmark for AsyncWaitOperator
 new 707905f  [FLINK-14118] Add throughput benchmark executor for data skew 
scenario. (#31)
 new 46b7ce9  [FLINK-14118] Bump number of writer threads in network data 
skew Benchmark to 10
 new 735bd06  [hotfix] Update flink version from 1.9 to 1.10.
 new 91d5e0b  [hotfix] Increase max memory of the test jvm to 6g.
 new fd337c6  [FLINK-14783] Add ContinuousFileReaderOperator benchmark (#34)
 new cb73ac5  [FLINK-13846] Add benchmark of mapstate isEmpty
 new 4625672  [FLINK-14346] [serialization] Add string-heavy version of 
SerializationFrameworkMiniBenchmark
 new cd6f0a3  [hotfix] Make StateBackendContext extends 
FlinkEnvironmentContext for deduplication.
 new f54e93b  [FLINK-15103] Set number of network buffers to be the same as 
before flip49 for all benchmarks using local executor.
 new 322d34d  [hotfix] Bump default Flink version to 1.11-SNAPSHOT
 new bbac57c  [FLINK-15199] Fix compile error after FLINK-14926
 new e64a0e0  [FLINK-15070] Add benchmarks for blocking partition
 new a63a4bc  fixup! [FLINK-15103] Set number of network buffers to be the 
same as before flip49 for all benchmarks using local executor.
 new cfcbb96  

[flink] branch release-1.11 updated (3fe471f -> 56e9f84)

2020-06-05 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 3fe471f  [FLINK-17553][table] Fix plan error when constant exists in 
group window key
 add 56e9f84  [FLINK-18042][build] Bump flink-shaded to 11.0

No new revisions were added by this update.

Summary of changes:
 .../main/java/org/apache/flink/test/util/SecureTestEnvironment.java   | 4 ++--
 pom.xml   | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)



[flink] branch master updated (826a075 -> d991b1a)

2020-06-05 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 826a075  [FLINK-18042][build] Bump flink-shaded to 11.0
 add d991b1a  [FLINK-17816][metrics] Schedule latency markers with fixed 
delay

No new revisions were added by this update.

Summary of changes:
 .../java/org/apache/flink/streaming/api/operators/StreamSource.java | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)



[flink] branch master updated (c0673a2 -> 826a075)

2020-06-05 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from c0673a2  [FLINK-18038][statebackend] Log application-defined state 
backends after configuration
 add 826a075  [FLINK-18042][build] Bump flink-shaded to 11.0

No new revisions were added by this update.

Summary of changes:
 .../main/java/org/apache/flink/test/util/SecureTestEnvironment.java   | 4 ++--
 pom.xml   | 4 ++--
 2 files changed, 4 insertions(+), 4 deletions(-)



[flink] branch master updated (c67109e -> c0673a2)

2020-06-05 Thread chesnay
This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from c67109e  [FLINK-17553][table] Fix plan error when constant exists in 
group window key
 add c0673a2  [FLINK-18038][statebackend] Log application-defined state 
backends after configuration

No new revisions were added by this update.

Summary of changes:
 .../org/apache/flink/runtime/state/StateBackendLoader.java | 10 +-
 1 file changed, 5 insertions(+), 5 deletions(-)



[flink] branch release-1.11 updated (78b3f48 -> 3fe471f)

2020-06-05 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 78b3f48  [FLINK-18142][hive] Wrong state names in 
HiveContinuousMonitoringFunction
 add 3fe471f  [FLINK-17553][table] Fix plan error when constant exists in 
group window key

No new revisions were added by this update.

Summary of changes:
 .../planner/plan/rules/FlinkStreamRuleSets.scala   |  4 +++
 .../table/planner/plan/stream/sql/RankTest.xml | 18 ++
 .../plan/stream/sql/agg/WindowAggregateTest.xml| 29 ++
 .../plan/stream/sql/agg/WindowAggregateTest.scala  | 14 +++
 .../runtime/stream/sql/WindowAggregateITCase.scala | 28 +
 5 files changed, 83 insertions(+), 10 deletions(-)



[flink] branch master updated (46d42e5 -> c67109e)

2020-06-05 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 46d42e5  [FLINK-18069][CI] Test if Scala/JavaDocs builds are passing 
in the compile stage
 add c67109e  [FLINK-17553][table] Fix plan error when constant exists in 
group window key

No new revisions were added by this update.

Summary of changes:
 .../planner/plan/rules/FlinkStreamRuleSets.scala   |  4 +++
 .../table/planner/plan/stream/sql/RankTest.xml | 18 ++
 .../plan/stream/sql/agg/WindowAggregateTest.xml| 29 ++
 .../plan/stream/sql/agg/WindowAggregateTest.scala  | 14 +++
 .../runtime/stream/sql/WindowAggregateITCase.scala | 28 +
 5 files changed, 83 insertions(+), 10 deletions(-)



[flink] branch master updated: [FLINK-18069][CI] Test if Scala/JavaDocs builds are passing in the compile stage

2020-06-05 Thread rmetzger
This is an automated email from the ASF dual-hosted git repository.

rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
 new 46d42e5  [FLINK-18069][CI] Test if Scala/JavaDocs builds are passing 
in the compile stage
46d42e5 is described below

commit 46d42e5b718ea2cd914dc7feaf4425e04c3e4645
Author: Robert Metzger 
AuthorDate: Tue Jun 2 15:07:44 2020 +0200

[FLINK-18069][CI] Test if Scala/JavaDocs builds are passing in the compile 
stage

This closes #12447
---
 tools/ci/compile.sh | 65 ++---
 1 file changed, 42 insertions(+), 23 deletions(-)

diff --git a/tools/ci/compile.sh b/tools/ci/compile.sh
index b0b4803..00665ec 100755
--- a/tools/ci/compile.sh
+++ b/tools/ci/compile.sh
@@ -47,35 +47,54 @@ run_mvn clean install $MAVEN_OPTS 
-Dflink.convergence.phase=install -Pcheck-conv
 
 EXIT_CODE=$?
 
-if [ $EXIT_CODE == 0 ]; then
+if [ $EXIT_CODE != 0 ]; then
 echo 
"=="
-echo "Checking scala suffixes"
+echo "Compiling Flink failed."
 echo 
"=="
+exit $EXIT_CODE
+fi
 
-${CI_DIR}/verify_scala_suffixes.sh "${PROFILE}"
-EXIT_CODE=$?
-else
-echo 
"=="
-echo "Previous build failure detected, skipping scala-suffixes check."
-echo 
"=="
+echo " Checking Javadocs "
+
+# use the same invocation as on buildbot 
(https://svn.apache.org/repos/infra/infrastructure/buildbot/aegis/buildmaster/master1/projects/flink.conf)
+run_mvn javadoc:aggregate -Paggregate-scaladoc 
-DadditionalJOption='-Xdoclint:none' \
+  -Dmaven.javadoc.failOnError=false -Dcheckstyle.skip=true 
-Denforcer.skip=true \
+  -Dheader=someTestHeader > javadoc.out
+EXIT_CODE=$?
+if [ $EXIT_CODE != 0 ] ; then
+  echo "ERROR in Javadocs. Printing full output:"
+  cat javadoc.out ; rm javadoc.out
+  exit $EXIT_CODE
 fi
 
-if [ $EXIT_CODE == 0 ]; then
-check_shaded_artifacts
-EXIT_CODE=$(($EXIT_CODE+$?))
-check_shaded_artifacts_s3_fs hadoop
-EXIT_CODE=$(($EXIT_CODE+$?))
-check_shaded_artifacts_s3_fs presto
-EXIT_CODE=$(($EXIT_CODE+$?))
-check_shaded_artifacts_connector_elasticsearch 5
-EXIT_CODE=$(($EXIT_CODE+$?))
-check_shaded_artifacts_connector_elasticsearch 6
-EXIT_CODE=$(($EXIT_CODE+$?))
-else
-echo 
"=="
-echo "Previous build failure detected, skipping shaded dependency check."
-echo 
"=="
+echo " Checking Scaladocs "
+
+cd flink-scala
+run_mvn scala:doc 2> scaladoc.out
+EXIT_CODE=$?
+if [ $EXIT_CODE != 0 ] ; then
+  echo "ERROR in Scaladocs. Printing full output:"
+  cat scaladoc.out ; rm scaladoc.out
+  exit $EXIT_CODE
 fi
+cd ..
+
+echo " Checking scala suffixes "
+
+${CI_DIR}/verify_scala_suffixes.sh "${PROFILE}" || exit $?
+
+echo " Checking shaded dependencies "
+
+check_shaded_artifacts
+EXIT_CODE=$(($EXIT_CODE+$?))
+check_shaded_artifacts_s3_fs hadoop
+EXIT_CODE=$(($EXIT_CODE+$?))
+check_shaded_artifacts_s3_fs presto
+EXIT_CODE=$(($EXIT_CODE+$?))
+check_shaded_artifacts_connector_elasticsearch 5
+EXIT_CODE=$(($EXIT_CODE+$?))
+check_shaded_artifacts_connector_elasticsearch 6
+EXIT_CODE=$(($EXIT_CODE+$?))
 
 exit $EXIT_CODE
 



[flink] branch release-1.11 updated: [FLINK-18142][hive] Wrong state names in HiveContinuousMonitoringFunction

2020-06-05 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 78b3f48  [FLINK-18142][hive] Wrong state names in 
HiveContinuousMonitoringFunction
78b3f48 is described below

commit 78b3f48d5d13cb64cecccd833cc1dd18c8d520e9
Author: Jingsong Lee 
AuthorDate: Fri Jun 5 15:50:54 2020 +0800

[FLINK-18142][hive] Wrong state names in HiveContinuousMonitoringFunction


This closes #12497
---
 .../flink/connectors/hive/read/HiveContinuousMonitoringFunction.java  | 4 ++--
 .../java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java  | 1 +
 2 files changed, 3 insertions(+), 2 deletions(-)

diff --git 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
index cdd1e76..893e880 100644
--- 
a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
+++ 
b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java
@@ -171,13 +171,13 @@ public class HiveContinuousMonitoringFunction
public void initializeState(FunctionInitializationContext context) 
throws Exception {
this.currReadTimeState = 
context.getOperatorStateStore().getListState(
new ListStateDescriptor<>(
-   "partition-monitoring-state",
+   "current-read-time-state",
LongSerializer.INSTANCE
)
);
this.distinctPartsState = 
context.getOperatorStateStore().getListState(
new ListStateDescriptor<>(
-   "partition-monitoring-state",
+   "distinct-partitions-state",
new ListSerializer<>(new 
ListSerializer<>(StringSerializer.INSTANCE))
)
);
diff --git 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
index 317943a..32a5cd4 100644
--- 
a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
+++ 
b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java
@@ -464,6 +464,7 @@ public class HiveTableSourceITCase extends 
BatchAbstractTestBase {
final String dbName = "source_db";
final String tblName = "stream_test";
StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+   env.enableCheckpointing(100);
StreamTableEnvironment tEnv = 
HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env, SqlDialect.HIVE);
tEnv.registerCatalog(catalogName, hiveCatalog);
tEnv.useCatalog(catalogName);



[flink] branch master updated (b28cca2 -> d3dbf61)

2020-06-05 Thread lzljs3620320
This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from b28cca2  [FLINK-18059][sql-client] Fix create/drop catalog statement 
can not be executed in sql client
 add d3dbf61  [FLINK-18142][hive] Wrong state names in 
HiveContinuousMonitoringFunction

No new revisions were added by this update.

Summary of changes:
 .../flink/connectors/hive/read/HiveContinuousMonitoringFunction.java  | 4 ++--
 .../java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java  | 1 +
 2 files changed, 3 insertions(+), 2 deletions(-)



[flink] branch release-1.11 updated: [FLINK-18059][sql-client] Fix create/drop catalog statement can not be executed in sql client

2020-06-05 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
 new 89be0b1  [FLINK-18059][sql-client] Fix create/drop catalog statement 
can not be executed in sql client
89be0b1 is described below

commit 89be0b1c12da7a0c72f0db304479767db8f72488
Author: godfrey he 
AuthorDate: Fri Jun 5 15:13:09 2020 +0800

[FLINK-18059][sql-client] Fix create/drop catalog statement can not be 
executed in sql client

This closes #12435
---
 .../apache/flink/table/client/cli/CliClient.java   | 111 +
 .../apache/flink/table/client/cli/CliStrings.java  |   8 +-
 .../flink/table/client/cli/CliClientTest.java  |  14 +++
 .../flink/table/client/cli/TestingExecutor.java|  15 ++-
 .../table/client/cli/TestingExecutorBuilder.java   |  12 ++-
 5 files changed, 68 insertions(+), 92 deletions(-)

diff --git 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index ffb2d0e..b1acc76 100644
--- 
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++ 
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -313,10 +313,10 @@ public class CliClient {
callInsert(cmdCall);
break;
case CREATE_TABLE:
-   callCreateTable(cmdCall);
+   callDdl(cmdCall.operands[0], 
CliStrings.MESSAGE_TABLE_CREATED);
break;
case DROP_TABLE:
-   callDropTable(cmdCall);
+   callDdl(cmdCall.operands[0], 
CliStrings.MESSAGE_TABLE_REMOVED);
break;
case CREATE_VIEW:
callCreateView(cmdCall);
@@ -325,28 +325,37 @@ public class CliClient {
callDropView(cmdCall);
break;
case CREATE_FUNCTION:
-   callCreateFunction(cmdCall);
+   callDdl(cmdCall.operands[0], 
CliStrings.MESSAGE_FUNCTION_CREATED);
break;
case DROP_FUNCTION:
-   callDropFunction(cmdCall);
+   callDdl(cmdCall.operands[0], 
CliStrings.MESSAGE_FUNCTION_REMOVED);
break;
case ALTER_FUNCTION:
-   callAlterFunction(cmdCall);
+   callDdl(cmdCall.operands[0], 
CliStrings.MESSAGE_ALTER_FUNCTION_SUCCEEDED,
+   
CliStrings.MESSAGE_ALTER_FUNCTION_FAILED);
break;
case SOURCE:
callSource(cmdCall);
break;
case CREATE_DATABASE:
-   callCreateDatabase(cmdCall);
+   callDdl(cmdCall.operands[0], 
CliStrings.MESSAGE_DATABASE_CREATED);
break;
case DROP_DATABASE:
-   callDropDatabase(cmdCall);
+   callDdl(cmdCall.operands[0], 
CliStrings.MESSAGE_DATABASE_REMOVED);
break;
case ALTER_DATABASE:
-   callAlterDatabase(cmdCall);
+   callDdl(cmdCall.operands[0], 
CliStrings.MESSAGE_ALTER_DATABASE_SUCCEEDED,
+   
CliStrings.MESSAGE_ALTER_DATABASE_FAILED);
break;
case ALTER_TABLE:
-   callAlterTable(cmdCall);
+   callDdl(cmdCall.operands[0], 
CliStrings.MESSAGE_ALTER_TABLE_SUCCEEDED,
+   
CliStrings.MESSAGE_ALTER_TABLE_FAILED);
+   break;
+   case CREATE_CATALOG:
+   callDdl(cmdCall.operands[0], 
CliStrings.MESSAGE_CATALOG_CREATED);
+   break;
+   case DROP_CATALOG:
+   callDdl(cmdCall.operands[0], 
CliStrings.MESSAGE_CATALOG_REMOVED);
break;
default:
throw new SqlClientException("Unsupported 
command: " + cmdCall.command);
@@ -583,24 +592,6 @@ public class CliClient {
return true;
  

[flink] branch master updated (e2cffe1 -> b28cca2)

2020-06-05 Thread jark
This is an automated email from the ASF dual-hosted git repository.

jark pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from e2cffe1  [FLINK-15687][runtime][test] Make TaskManagerActions access 
task slot table on rpc main thread in TaskSubmissionTestEnvironment.
 add b28cca2  [FLINK-18059][sql-client] Fix create/drop catalog statement 
can not be executed in sql client

No new revisions were added by this update.

Summary of changes:
 .../apache/flink/table/client/cli/CliClient.java   | 111 +
 .../apache/flink/table/client/cli/CliStrings.java  |   8 +-
 .../flink/table/client/cli/CliClientTest.java  |  14 +++
 .../flink/table/client/cli/TestingExecutor.java|  15 ++-
 .../table/client/cli/TestingExecutorBuilder.java   |  12 ++-
 5 files changed, 68 insertions(+), 92 deletions(-)



[flink] 02/02: [FLINK-15687][runtime][test] Make TaskManagerActions access task slot table on rpc main thread in TaskSubmissionTestEnvironment.

2020-06-05 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 10b284201337750422b663e8b69b7d867825da40
Author: Xintong Song 
AuthorDate: Thu May 28 16:23:31 2020 +0800

[FLINK-15687][runtime][test] Make TaskManagerActions access task slot table 
on rpc main thread in TaskSubmissionTestEnvironment.

This closes #12399.
---
 .../TaskSubmissionTestEnvironment.java | 25 +++---
 1 file changed, 13 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 14adabf..e79b720 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -137,20 +137,8 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
jobMasterGateway = testingJobMasterGateway;
}
 
-   TaskManagerActions taskManagerActions;
-   if (taskManagerActionListeners.size() == 0) {
-   taskManagerActions = new NoOpTaskManagerActions();
-   } else {
-   TestTaskManagerActions testTaskManagerActions = new 
TestTaskManagerActions(taskSlotTable, jobMasterGateway);
-   for (Tuple3> listenerTuple : taskManagerActionListeners) {
-   
testTaskManagerActions.addListener(listenerTuple.f0, listenerTuple.f1, 
listenerTuple.f2);
-   }
-   taskManagerActions = testTaskManagerActions;
-   }
-
this.testingRpcService = testingRpcService;
final DefaultJobTable jobTable = DefaultJobTable.create();
-   registerJobMasterConnection(jobTable, jobId, testingRpcService, 
jobMasterGateway, taskManagerActions, timeout);
 
TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
false,
@@ -170,6 +158,19 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
taskExecutor.waitUntilStarted();
 
this.threadSafeTaskSlotTable = new 
ThreadSafeTaskSlotTable<>(taskSlotTable, 
taskExecutor.getMainThreadExecutableForTesting());
+
+   TaskManagerActions taskManagerActions;
+   if (taskManagerActionListeners.size() == 0) {
+   taskManagerActions = new NoOpTaskManagerActions();
+   } else {
+   TestTaskManagerActions testTaskManagerActions = new 
TestTaskManagerActions(threadSafeTaskSlotTable, jobMasterGateway);
+   for (Tuple3> listenerTuple : taskManagerActionListeners) {
+   
testTaskManagerActions.addListener(listenerTuple.f0, listenerTuple.f1, 
listenerTuple.f2);
+   }
+   taskManagerActions = testTaskManagerActions;
+   }
+
+   registerJobMasterConnection(jobTable, jobId, testingRpcService, 
jobMasterGateway, taskManagerActions, timeout);
}
 
static void registerJobMasterConnection(



[flink] branch release-1.11 updated (37f436e -> 10b2842)

2020-06-05 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git.


from 37f436e  [FLINK-18143][python] Fix Python meter metric incorrect value 
problem (#12498)
 new 359ab03  [FLINK-15687][runtime][test] Fix accessing TaskSlotTable via 
TaskSubmissionTestEnvironment not in RPC main thread.
 new 10b2842  [FLINK-15687][runtime][test] Make TaskManagerActions access 
task slot table on rpc main thread in TaskSubmissionTestEnvironment.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../TaskSubmissionTestEnvironment.java |  34 ++--
 .../runtime/taskexecutor/TestingTaskExecutor.java  |   5 +
 .../taskexecutor/slot/ThreadSafeTaskSlotTable.java | 206 +
 3 files changed, 230 insertions(+), 15 deletions(-)
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java



[flink] 01/02: [FLINK-15687][runtime][test] Fix accessing TaskSlotTable via TaskSubmissionTestEnvironment not in RPC main thread.

2020-06-05 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 359ab037349145b8eaadbd476d2f1a98bdab45ed
Author: Xintong Song 
AuthorDate: Thu May 28 16:11:31 2020 +0800

[FLINK-15687][runtime][test] Fix accessing TaskSlotTable via 
TaskSubmissionTestEnvironment not in RPC main thread.
---
 .../TaskSubmissionTestEnvironment.java |   9 +-
 .../runtime/taskexecutor/TestingTaskExecutor.java  |   5 +
 .../taskexecutor/slot/ThreadSafeTaskSlotTable.java | 206 +
 3 files changed, 217 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 824e0f0..14adabf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -51,6 +51,7 @@ import 
org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNot
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
 import org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotTable;
+import org.apache.flink.runtime.taskexecutor.slot.ThreadSafeTaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
 import org.apache.flink.runtime.taskmanager.Task;
@@ -91,7 +92,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
 
private final TestingHighAvailabilityServices haServices;
private final TemporaryFolder temporaryFolder;
-   private final TaskSlotTable taskSlotTable;
+   private final ThreadSafeTaskSlotTable threadSafeTaskSlotTable;
private final JobMasterId jobMasterId;
 
private TestingTaskExecutor taskExecutor;
@@ -116,7 +117,7 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
 
this.jobMasterId = jobMasterId;
 
-   this.taskSlotTable = slotSize > 0 ?
+   final TaskSlotTable taskSlotTable = slotSize > 0 ?
TaskSlotUtils.createTaskSlotTable(slotSize) :
TestingTaskSlotTable
.newBuilder()
@@ -167,6 +168,8 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
 
taskExecutor.start();
taskExecutor.waitUntilStarted();
+
+   this.threadSafeTaskSlotTable = new 
ThreadSafeTaskSlotTable<>(taskSlotTable, 
taskExecutor.getMainThreadExecutableForTesting());
}
 
static void registerJobMasterConnection(
@@ -198,7 +201,7 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
}
 
public TaskSlotTable getTaskSlotTable() {
-   return taskSlotTable;
+   return threadSafeTaskSlotTable;
}
 
public JobMasterId getJobMasterId() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
index 7a4a01f..95b2f2d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import javax.annotation.Nullable;
@@ -80,4 +81,8 @@ class TestingTaskExecutor extends TaskExecutor {
void waitUntilStarted() {
startFuture.join();
}
+
+   MainThreadExecutable getMainThreadExecutableForTesting() {
+   return this.rpcServer;
+   }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java
new file mode 100644
index 000..0b9b668
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF 

[flink] 01/02: [FLINK-15687][runtime][test] Fix accessing TaskSlotTable via TaskSubmissionTestEnvironment not in RPC main thread.

2020-06-05 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 84c290581694f17e08f9ba91be6eac70264b9d70
Author: Xintong Song 
AuthorDate: Thu May 28 16:11:31 2020 +0800

[FLINK-15687][runtime][test] Fix accessing TaskSlotTable via 
TaskSubmissionTestEnvironment not in RPC main thread.
---
 .../TaskSubmissionTestEnvironment.java |   9 +-
 .../runtime/taskexecutor/TestingTaskExecutor.java  |   5 +
 .../taskexecutor/slot/ThreadSafeTaskSlotTable.java | 206 +
 3 files changed, 217 insertions(+), 3 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 824e0f0..14adabf 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -51,6 +51,7 @@ import 
org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNot
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils;
 import org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotTable;
+import org.apache.flink.runtime.taskexecutor.slot.ThreadSafeTaskSlotTable;
 import org.apache.flink.runtime.taskexecutor.slot.TimerService;
 import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions;
 import org.apache.flink.runtime.taskmanager.Task;
@@ -91,7 +92,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable {
 
private final TestingHighAvailabilityServices haServices;
private final TemporaryFolder temporaryFolder;
-   private final TaskSlotTable taskSlotTable;
+   private final ThreadSafeTaskSlotTable threadSafeTaskSlotTable;
private final JobMasterId jobMasterId;
 
private TestingTaskExecutor taskExecutor;
@@ -116,7 +117,7 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
 
this.jobMasterId = jobMasterId;
 
-   this.taskSlotTable = slotSize > 0 ?
+   final TaskSlotTable taskSlotTable = slotSize > 0 ?
TaskSlotUtils.createTaskSlotTable(slotSize) :
TestingTaskSlotTable
.newBuilder()
@@ -167,6 +168,8 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
 
taskExecutor.start();
taskExecutor.waitUntilStarted();
+
+   this.threadSafeTaskSlotTable = new 
ThreadSafeTaskSlotTable<>(taskSlotTable, 
taskExecutor.getMainThreadExecutableForTesting());
}
 
static void registerJobMasterConnection(
@@ -198,7 +201,7 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
}
 
public TaskSlotTable getTaskSlotTable() {
-   return taskSlotTable;
+   return threadSafeTaskSlotTable;
}
 
public JobMasterId getJobMasterId() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
index 7a4a01f..95b2f2d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker;
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup;
 import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.RpcService;
 
 import javax.annotation.Nullable;
@@ -80,4 +81,8 @@ class TestingTaskExecutor extends TaskExecutor {
void waitUntilStarted() {
startFuture.join();
}
+
+   MainThreadExecutable getMainThreadExecutableForTesting() {
+   return this.rpcServer;
+   }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java
new file mode 100644
index 000..0b9b668
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses 

[flink] branch master updated (dcd764a -> e2cffe1)

2020-06-05 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git.


from dcd764a  [FLINK-18143][python] Fix Python meter metric incorrect value 
problem (#12498)
 new 84c2905  [FLINK-15687][runtime][test] Fix accessing TaskSlotTable via 
TaskSubmissionTestEnvironment not in RPC main thread.
 new e2cffe1  [FLINK-15687][runtime][test] Make TaskManagerActions access 
task slot table on rpc main thread in TaskSubmissionTestEnvironment.

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../TaskSubmissionTestEnvironment.java |  34 ++--
 .../runtime/taskexecutor/TestingTaskExecutor.java  |   5 +
 .../taskexecutor/slot/ThreadSafeTaskSlotTable.java | 206 +
 3 files changed, 230 insertions(+), 15 deletions(-)
 create mode 100644 
flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java



[flink] 02/02: [FLINK-15687][runtime][test] Make TaskManagerActions access task slot table on rpc main thread in TaskSubmissionTestEnvironment.

2020-06-05 Thread xtsong
This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e2cffe19a044d6decfae83e14ce59eca1e4c449f
Author: Xintong Song 
AuthorDate: Thu May 28 16:23:31 2020 +0800

[FLINK-15687][runtime][test] Make TaskManagerActions access task slot table 
on rpc main thread in TaskSubmissionTestEnvironment.

This closes #12399.
---
 .../TaskSubmissionTestEnvironment.java | 25 +++---
 1 file changed, 13 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 14adabf..e79b720 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -137,20 +137,8 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
jobMasterGateway = testingJobMasterGateway;
}
 
-   TaskManagerActions taskManagerActions;
-   if (taskManagerActionListeners.size() == 0) {
-   taskManagerActions = new NoOpTaskManagerActions();
-   } else {
-   TestTaskManagerActions testTaskManagerActions = new 
TestTaskManagerActions(taskSlotTable, jobMasterGateway);
-   for (Tuple3> listenerTuple : taskManagerActionListeners) {
-   
testTaskManagerActions.addListener(listenerTuple.f0, listenerTuple.f1, 
listenerTuple.f2);
-   }
-   taskManagerActions = testTaskManagerActions;
-   }
-
this.testingRpcService = testingRpcService;
final DefaultJobTable jobTable = DefaultJobTable.create();
-   registerJobMasterConnection(jobTable, jobId, testingRpcService, 
jobMasterGateway, taskManagerActions, timeout);
 
TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
false,
@@ -170,6 +158,19 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
taskExecutor.waitUntilStarted();
 
this.threadSafeTaskSlotTable = new 
ThreadSafeTaskSlotTable<>(taskSlotTable, 
taskExecutor.getMainThreadExecutableForTesting());
+
+   TaskManagerActions taskManagerActions;
+   if (taskManagerActionListeners.size() == 0) {
+   taskManagerActions = new NoOpTaskManagerActions();
+   } else {
+   TestTaskManagerActions testTaskManagerActions = new 
TestTaskManagerActions(threadSafeTaskSlotTable, jobMasterGateway);
+   for (Tuple3> listenerTuple : taskManagerActionListeners) {
+   
testTaskManagerActions.addListener(listenerTuple.f0, listenerTuple.f1, 
listenerTuple.f2);
+   }
+   taskManagerActions = testTaskManagerActions;
+   }
+
+   registerJobMasterConnection(jobTable, jobId, testingRpcService, 
jobMasterGateway, taskManagerActions, timeout);
}
 
static void registerJobMasterConnection(