[flink] branch master updated: [FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong channels for inflight data.
This is an automated email from the ASF dual-hosted git repository. zhijiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new d25609d [FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong channels for inflight data. d25609d is described below commit d25609d07f52ba9bac3f7bb33ce0635e255e3d9d Author: Arvid Heise AuthorDate: Thu Jun 4 22:51:18 2020 +0200 [FLINK-18139][checkpointing] Fixing unaligned checkpoints checks wrong channels for inflight data. CheckpointBarrierUnaligner#hasInflightData was not called with input gate contextual information, such that only the same first few channels are checked during initial snapshotting of inflight data for multi-gate setups. --- .../partition/consumer/SingleInputGateBuilder.java | 4 +- .../io/AlternatingCheckpointBarrierHandler.java| 5 +- .../runtime/io/CheckpointBarrierHandler.java | 3 +- .../runtime/io/CheckpointBarrierUnaligner.java | 4 +- .../runtime/io/CheckpointedInputGate.java | 5 +- .../AlternatingCheckpointBarrierHandlerTest.java | 4 +- .../runtime/io/StreamTaskNetworkInputTest.java | 73 +- 7 files changed, 85 insertions(+), 13 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java index ad607f7..b279998 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateBuilder.java @@ -70,12 +70,12 @@ public class SingleInputGateBuilder { return this; } - SingleInputGateBuilder setConsumedSubpartitionIndex(int consumedSubpartitionIndex) { + public SingleInputGateBuilder setConsumedSubpartitionIndex(int consumedSubpartitionIndex) { this.consumedSubpartitionIndex = consumedSubpartitionIndex; return this; } - SingleInputGateBuilder setSingleInputGateIndex(int gateIndex) { + public SingleInputGateBuilder setSingleInputGateIndex(int gateIndex) { this.gateIndex = gateIndex; return this; } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java index 09f05b9..8ed6788 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/AlternatingCheckpointBarrierHandler.java @@ -18,6 +18,7 @@ package org.apache.flink.streaming.runtime.io; import org.apache.flink.runtime.checkpoint.CheckpointException; +import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import org.apache.flink.runtime.io.network.buffer.BufferReceivedListener; @@ -99,9 +100,9 @@ class AlternatingCheckpointBarrierHandler extends CheckpointBarrierHandler { } @Override - public boolean hasInflightData(long checkpointId, int channelIndex) { + public boolean hasInflightData(long checkpointId, InputChannelInfo channelInfo) { // should only be called for unaligned checkpoint - return unalignedHandler.hasInflightData(checkpointId, channelIndex); + return unalignedHandler.hasInflightData(checkpointId, channelInfo); } @Override diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java index 07f6f7a..952af24 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/CheckpointBarrierHandler.java @@ -22,6 +22,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.checkpoint.CheckpointMetaData; import org.apache.flink.runtime.checkpoint.CheckpointMetrics; +import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo; import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker; import org.apache.flink.runtime.io.network.api.CheckpointBarrier; import
[flink] 02/05: [FLINK-17260] Increase timeout for reading Kafka messages in StreamingKafkaITCase
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 31c3a15731e91d01fb033fe5a8f8173e3ba0cb38 Author: Aljoscha Krettek AuthorDate: Thu Jun 4 10:33:24 2020 +0200 [FLINK-17260] Increase timeout for reading Kafka messages in StreamingKafkaITCase --- .../org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java index a651d12..5dc1137 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java @@ -278,7 +278,7 @@ public class LocalStandaloneKafkaResource implements KafkaResource { .setStdoutProcessor(messages::add) .runNonBlocking()) { - final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30)); + final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(120)); while (deadline.hasTimeLeft() && messages.size() < expectedNumMessages) { try { LOG.info("Waiting for messages. Received {}/{}.", messages.size(),
[flink] 01/05: [FLINK-17260] Make number of expected messages explicit in StreamingKafkaITCase
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 6505fe43d1b0cc3cf8543caf3fbdb1deae9697c5 Author: Aljoscha Krettek AuthorDate: Thu Jun 4 10:16:06 2020 +0200 [FLINK-17260] Make number of expected messages explicit in StreamingKafkaITCase Before, it could happen that we time out and return early, which would lead to a test failure. Now, we would fail at the source of the problem. --- .../org/apache/flink/tests/util/kafka/KafkaResource.java | 7 --- .../tests/util/kafka/LocalStandaloneKafkaResource.java| 15 ++- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java index 679d6c4..0157ad2 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java @@ -63,15 +63,16 @@ public interface KafkaResource extends ExternalResource { InetSocketAddress getZookeeperAddress(); /** -* Reads up to {@code maxNumMessages} from the given topic. +* Reads {@code expectedNumMessages} from the given topic. If we can't read the expected number +* of messages we throw an exception. * -* @param maxNumMessages maximum number of messages that should be read +* @param expectedNumMessages expected number of messages that should be read * @param groupId group id to identify consumer * @param topic topic name * @return read messages * @throws IOException */ - List readMessage(int maxNumMessages, String groupId, String topic) throws IOException; + List readMessage(int expectedNumMessages, String groupId, String topic) throws IOException; /** * Modifies the number of partitions for the given topic. diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java index 405690f..a651d12 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java @@ -260,8 +260,9 @@ public class LocalStandaloneKafkaResource implements KafkaResource { } @Override - public List readMessage(int maxNumMessages, String groupId, String topic) throws IOException { - final List messages = Collections.synchronizedList(new ArrayList<>(maxNumMessages)); + public List readMessage(int expectedNumMessages, String groupId, String topic) throws IOException { + final List messages = Collections.synchronizedList(new ArrayList<>( + expectedNumMessages)); try (final AutoClosableProcess kafka = AutoClosableProcess .create(kafkaDir.resolve(Paths.get("bin", "kafka-console-consumer.sh")).toString(), @@ -269,7 +270,7 @@ public class LocalStandaloneKafkaResource implements KafkaResource { KAFKA_ADDRESS, "--from-beginning", "--max-messages", - String.valueOf(maxNumMessages), + String.valueOf(expectedNumMessages), "--topic", topic, "--consumer-property", @@ -278,15 +279,19 @@ public class LocalStandaloneKafkaResource implements KafkaResource { .runNonBlocking()) { final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30)); - while (deadline.hasTimeLeft() && messages.size() < maxNumMessages) { + while (deadline.hasTimeLeft() && messages.size() < expectedNumMessages) { try { - LOG.info("Waiting for messages. Received {}/{}.", messages.size(), maxNumMessages); + LOG.info("Waiting for messages. Received {}/{}.", messages.size(), + expectedNumMessages);
[flink] 04/05: [FLINK-18020] Make topic names unique in SQLClientKafkaITCase to prevent clashes
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit a03c8f13f735bf5b8d120fb7aa95587574b6dfd6 Author: Aljoscha Krettek AuthorDate: Thu Jun 4 15:20:19 2020 +0200 [FLINK-18020] Make topic names unique in SQLClientKafkaITCase to prevent clashes Duplicate topic names and leftover data could be a potential source of instabilities. --- .../org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java| 7 +-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java index bf7358d..fabefc1 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java @@ -58,6 +58,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import static org.hamcrest.Matchers.arrayContainingInAnyOrder; import static org.junit.Assert.assertThat; @@ -92,6 +93,7 @@ public class SQLClientKafkaITCase extends TestLogger { @Rule public final TemporaryFolder tmp = new TemporaryFolder(); + private final String kafkaVersion; private final String kafkaSQLVersion; private Path result; private Path sqlClientSessionConf; @@ -107,6 +109,7 @@ public class SQLClientKafkaITCase extends TestLogger { public SQLClientKafkaITCase(String kafkaVersion, String kafkaSQLVersion, String kafkaSQLJarPattern) { this.kafka = KafkaResource.get(kafkaVersion); + this.kafkaVersion = kafkaVersion; this.kafkaSQLVersion = kafkaSQLVersion; this.sqlConnectorKafkaJar = TestUtils.getResourceJar(kafkaSQLJarPattern); @@ -129,8 +132,8 @@ public class SQLClientKafkaITCase extends TestLogger { public void testKafka() throws Exception { try (ClusterController clusterController = flink.startCluster(2)) { // Create topic and send message - String testJsonTopic = "test-json"; - String testAvroTopic = "test-avro"; + String testJsonTopic = "test-json-" + kafkaVersion + "-" + UUID.randomUUID().toString(); + String testAvroTopic = "test-avro-" + kafkaVersion + "-" + UUID.randomUUID().toString(); kafka.createTopic(1, 1, testJsonTopic); String[] messages = new String[]{ "{\"timestamp\": \"2018-03-12T08:00:00Z\", \"user\": \"Alice\", \"event\": { \"type\": \"WARNING\", \"message\": \"This is a warning.\"}}",
[flink] 03/05: [FLINK-17260] Make topic names unique in StreamingKafkaITCase to prevent clashes
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit e4034ac296d4e8b43af570d3436cd2dec33d9ef1 Author: Aljoscha Krettek AuthorDate: Thu Jun 4 10:34:16 2020 +0200 [FLINK-17260] Make topic names unique in StreamingKafkaITCase to prevent clashes Duplicate topic names and leftover data could be a potential source of instabilities. --- .../org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java index 4f62839..5e64159 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java @@ -43,6 +43,7 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.UUID; import java.util.stream.Collectors; /** @@ -65,6 +66,8 @@ public class StreamingKafkaITCase extends TestLogger { private final Path kafkaExampleJar; + private final String kafkaVersion; + @Rule public final KafkaResource kafka; @@ -81,14 +84,15 @@ public class StreamingKafkaITCase extends TestLogger { public StreamingKafkaITCase(final String kafkaExampleJarPattern, final String kafkaVersion) { this.kafkaExampleJar = TestUtils.getResourceJar(kafkaExampleJarPattern); this.kafka = KafkaResource.get(kafkaVersion); + this.kafkaVersion = kafkaVersion; } @Test public void testKafka() throws Exception { try (final ClusterController clusterController = flink.startCluster(1)) { - final String inputTopic = "test-input"; - final String outputTopic = "test-output"; + final String inputTopic = "test-input-" + kafkaVersion + "-" + UUID.randomUUID().toString(); + final String outputTopic = "test-output" + kafkaVersion + "-" + UUID.randomUUID().toString(); // create the required topics kafka.createTopic(1, 1, inputTopic);
[flink] 05/05: [FLINK-18020] Increase timeout in SQLClientKafkaITCase
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit cc59b3a7c4853be80a9d2868ed9e7d6c09966edc Author: Aljoscha Krettek AuthorDate: Thu Jun 4 15:21:06 2020 +0200 [FLINK-18020] Increase timeout in SQLClientKafkaITCase --- .../apache/flink/tests/util/kafka/SQLClientKafkaITCase.java| 10 ++ 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java index fabefc1..f087e31 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.tests.util.kafka; +import org.apache.flink.api.common.time.Deadline; import org.apache.flink.tests.util.TestUtils; import org.apache.flink.tests.util.cache.DownloadCache; import org.apache.flink.tests.util.categories.TravisGroup1; @@ -52,6 +53,7 @@ import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -234,8 +236,8 @@ public class SQLClientKafkaITCase extends TestLogger { private void checkCsvResultFile() throws Exception { boolean success = false; - long maxRetries = 10, duration = 5000L; - for (int i = 0; i < maxRetries; i++) { + final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(120)); + while (!success && deadline.hasTimeLeft()) { if (Files.exists(result)) { byte[] bytes = Files.readAllBytes(result); String[] lines = new String(bytes, Charsets.UTF_8).split("\n"); @@ -255,8 +257,8 @@ public class SQLClientKafkaITCase extends TestLogger { } else { LOG.info("The target CSV {} does not exist now", result); } - Thread.sleep(duration); + Thread.sleep(500); } - Assert.assertTrue("Timeout(" + (maxRetries * duration) + " sec) to read the correct CSV results.", success); + Assert.assertTrue("Did not get expected results before timeout.", success); } }
[flink] branch release-1.11 updated (39e3d6a -> cc59b3a)
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a change to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git. from 39e3d6a [FLINK-18048] Fix --host option for standalone application cluster new 6505fe4 [FLINK-17260] Make number of expected messages explicit in StreamingKafkaITCase new 31c3a15 [FLINK-17260] Increase timeout for reading Kafka messages in StreamingKafkaITCase new e4034ac [FLINK-17260] Make topic names unique in StreamingKafkaITCase to prevent clashes new a03c8f1 [FLINK-18020] Make topic names unique in SQLClientKafkaITCase to prevent clashes new cc59b3a [FLINK-18020] Increase timeout in SQLClientKafkaITCase The 5 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../apache/flink/tests/util/kafka/KafkaResource.java| 7 --- .../tests/util/kafka/LocalStandaloneKafkaResource.java | 17 +++-- .../flink/tests/util/kafka/SQLClientKafkaITCase.java| 17 +++-- .../flink/tests/util/kafka/StreamingKafkaITCase.java| 8 ++-- 4 files changed, 32 insertions(+), 17 deletions(-)
[flink] 03/05: [FLINK-17260] Make topic names unique in StreamingKafkaITCase to prevent clashes
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit e4034ac296d4e8b43af570d3436cd2dec33d9ef1 Author: Aljoscha Krettek AuthorDate: Thu Jun 4 10:34:16 2020 +0200 [FLINK-17260] Make topic names unique in StreamingKafkaITCase to prevent clashes Duplicate topic names and leftover data could be a potential source of instabilities. --- .../org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java | 8 ++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java index 4f62839..5e64159 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/StreamingKafkaITCase.java @@ -43,6 +43,7 @@ import java.nio.file.Path; import java.util.Arrays; import java.util.Collection; import java.util.List; +import java.util.UUID; import java.util.stream.Collectors; /** @@ -65,6 +66,8 @@ public class StreamingKafkaITCase extends TestLogger { private final Path kafkaExampleJar; + private final String kafkaVersion; + @Rule public final KafkaResource kafka; @@ -81,14 +84,15 @@ public class StreamingKafkaITCase extends TestLogger { public StreamingKafkaITCase(final String kafkaExampleJarPattern, final String kafkaVersion) { this.kafkaExampleJar = TestUtils.getResourceJar(kafkaExampleJarPattern); this.kafka = KafkaResource.get(kafkaVersion); + this.kafkaVersion = kafkaVersion; } @Test public void testKafka() throws Exception { try (final ClusterController clusterController = flink.startCluster(1)) { - final String inputTopic = "test-input"; - final String outputTopic = "test-output"; + final String inputTopic = "test-input-" + kafkaVersion + "-" + UUID.randomUUID().toString(); + final String outputTopic = "test-output" + kafkaVersion + "-" + UUID.randomUUID().toString(); // create the required topics kafka.createTopic(1, 1, inputTopic);
[flink] 01/05: [FLINK-17260] Make number of expected messages explicit in StreamingKafkaITCase
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 6505fe43d1b0cc3cf8543caf3fbdb1deae9697c5 Author: Aljoscha Krettek AuthorDate: Thu Jun 4 10:16:06 2020 +0200 [FLINK-17260] Make number of expected messages explicit in StreamingKafkaITCase Before, it could happen that we time out and return early, which would lead to a test failure. Now, we would fail at the source of the problem. --- .../org/apache/flink/tests/util/kafka/KafkaResource.java | 7 --- .../tests/util/kafka/LocalStandaloneKafkaResource.java| 15 ++- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java index 679d6c4..0157ad2 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/KafkaResource.java @@ -63,15 +63,16 @@ public interface KafkaResource extends ExternalResource { InetSocketAddress getZookeeperAddress(); /** -* Reads up to {@code maxNumMessages} from the given topic. +* Reads {@code expectedNumMessages} from the given topic. If we can't read the expected number +* of messages we throw an exception. * -* @param maxNumMessages maximum number of messages that should be read +* @param expectedNumMessages expected number of messages that should be read * @param groupId group id to identify consumer * @param topic topic name * @return read messages * @throws IOException */ - List readMessage(int maxNumMessages, String groupId, String topic) throws IOException; + List readMessage(int expectedNumMessages, String groupId, String topic) throws IOException; /** * Modifies the number of partitions for the given topic. diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java index 405690f..a651d12 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java @@ -260,8 +260,9 @@ public class LocalStandaloneKafkaResource implements KafkaResource { } @Override - public List readMessage(int maxNumMessages, String groupId, String topic) throws IOException { - final List messages = Collections.synchronizedList(new ArrayList<>(maxNumMessages)); + public List readMessage(int expectedNumMessages, String groupId, String topic) throws IOException { + final List messages = Collections.synchronizedList(new ArrayList<>( + expectedNumMessages)); try (final AutoClosableProcess kafka = AutoClosableProcess .create(kafkaDir.resolve(Paths.get("bin", "kafka-console-consumer.sh")).toString(), @@ -269,7 +270,7 @@ public class LocalStandaloneKafkaResource implements KafkaResource { KAFKA_ADDRESS, "--from-beginning", "--max-messages", - String.valueOf(maxNumMessages), + String.valueOf(expectedNumMessages), "--topic", topic, "--consumer-property", @@ -278,15 +279,19 @@ public class LocalStandaloneKafkaResource implements KafkaResource { .runNonBlocking()) { final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30)); - while (deadline.hasTimeLeft() && messages.size() < maxNumMessages) { + while (deadline.hasTimeLeft() && messages.size() < expectedNumMessages) { try { - LOG.info("Waiting for messages. Received {}/{}.", messages.size(), maxNumMessages); + LOG.info("Waiting for messages. Received {}/{}.", messages.size(), + expectedNumMessages);
[flink] 05/05: [FLINK-18020] Increase timeout in SQLClientKafkaITCase
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit cc59b3a7c4853be80a9d2868ed9e7d6c09966edc Author: Aljoscha Krettek AuthorDate: Thu Jun 4 15:21:06 2020 +0200 [FLINK-18020] Increase timeout in SQLClientKafkaITCase --- .../apache/flink/tests/util/kafka/SQLClientKafkaITCase.java| 10 ++ 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java index fabefc1..f087e31 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java @@ -18,6 +18,7 @@ package org.apache.flink.tests.util.kafka; +import org.apache.flink.api.common.time.Deadline; import org.apache.flink.tests.util.TestUtils; import org.apache.flink.tests.util.cache.DownloadCache; import org.apache.flink.tests.util.categories.TravisGroup1; @@ -52,6 +53,7 @@ import java.net.URL; import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.StandardOpenOption; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -234,8 +236,8 @@ public class SQLClientKafkaITCase extends TestLogger { private void checkCsvResultFile() throws Exception { boolean success = false; - long maxRetries = 10, duration = 5000L; - for (int i = 0; i < maxRetries; i++) { + final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(120)); + while (!success && deadline.hasTimeLeft()) { if (Files.exists(result)) { byte[] bytes = Files.readAllBytes(result); String[] lines = new String(bytes, Charsets.UTF_8).split("\n"); @@ -255,8 +257,8 @@ public class SQLClientKafkaITCase extends TestLogger { } else { LOG.info("The target CSV {} does not exist now", result); } - Thread.sleep(duration); + Thread.sleep(500); } - Assert.assertTrue("Timeout(" + (maxRetries * duration) + " sec) to read the correct CSV results.", success); + Assert.assertTrue("Did not get expected results before timeout.", success); } }
[flink] 04/05: [FLINK-18020] Make topic names unique in SQLClientKafkaITCase to prevent clashes
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit a03c8f13f735bf5b8d120fb7aa95587574b6dfd6 Author: Aljoscha Krettek AuthorDate: Thu Jun 4 15:20:19 2020 +0200 [FLINK-18020] Make topic names unique in SQLClientKafkaITCase to prevent clashes Duplicate topic names and leftover data could be a potential source of instabilities. --- .../org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java| 7 +-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java index bf7358d..fabefc1 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/test/java/org/apache/flink/tests/util/kafka/SQLClientKafkaITCase.java @@ -58,6 +58,7 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; import static org.hamcrest.Matchers.arrayContainingInAnyOrder; import static org.junit.Assert.assertThat; @@ -92,6 +93,7 @@ public class SQLClientKafkaITCase extends TestLogger { @Rule public final TemporaryFolder tmp = new TemporaryFolder(); + private final String kafkaVersion; private final String kafkaSQLVersion; private Path result; private Path sqlClientSessionConf; @@ -107,6 +109,7 @@ public class SQLClientKafkaITCase extends TestLogger { public SQLClientKafkaITCase(String kafkaVersion, String kafkaSQLVersion, String kafkaSQLJarPattern) { this.kafka = KafkaResource.get(kafkaVersion); + this.kafkaVersion = kafkaVersion; this.kafkaSQLVersion = kafkaSQLVersion; this.sqlConnectorKafkaJar = TestUtils.getResourceJar(kafkaSQLJarPattern); @@ -129,8 +132,8 @@ public class SQLClientKafkaITCase extends TestLogger { public void testKafka() throws Exception { try (ClusterController clusterController = flink.startCluster(2)) { // Create topic and send message - String testJsonTopic = "test-json"; - String testAvroTopic = "test-avro"; + String testJsonTopic = "test-json-" + kafkaVersion + "-" + UUID.randomUUID().toString(); + String testAvroTopic = "test-avro-" + kafkaVersion + "-" + UUID.randomUUID().toString(); kafka.createTopic(1, 1, testJsonTopic); String[] messages = new String[]{ "{\"timestamp\": \"2018-03-12T08:00:00Z\", \"user\": \"Alice\", \"event\": { \"type\": \"WARNING\", \"message\": \"This is a warning.\"}}",
[flink] branch release-1.11 updated (39e3d6a -> cc59b3a)
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a change to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git. from 39e3d6a [FLINK-18048] Fix --host option for standalone application cluster new 6505fe4 [FLINK-17260] Make number of expected messages explicit in StreamingKafkaITCase new 31c3a15 [FLINK-17260] Increase timeout for reading Kafka messages in StreamingKafkaITCase new e4034ac [FLINK-17260] Make topic names unique in StreamingKafkaITCase to prevent clashes new a03c8f1 [FLINK-18020] Make topic names unique in SQLClientKafkaITCase to prevent clashes new cc59b3a [FLINK-18020] Increase timeout in SQLClientKafkaITCase The 5 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../apache/flink/tests/util/kafka/KafkaResource.java| 7 --- .../tests/util/kafka/LocalStandaloneKafkaResource.java | 17 +++-- .../flink/tests/util/kafka/SQLClientKafkaITCase.java| 17 +++-- .../flink/tests/util/kafka/StreamingKafkaITCase.java| 8 ++-- 4 files changed, 32 insertions(+), 17 deletions(-)
[flink] 02/05: [FLINK-17260] Increase timeout for reading Kafka messages in StreamingKafkaITCase
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 31c3a15731e91d01fb033fe5a8f8173e3ba0cb38 Author: Aljoscha Krettek AuthorDate: Thu Jun 4 10:33:24 2020 +0200 [FLINK-17260] Increase timeout for reading Kafka messages in StreamingKafkaITCase --- .../org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java index a651d12..5dc1137 100644 --- a/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java +++ b/flink-end-to-end-tests/flink-end-to-end-tests-common-kafka/src/main/java/org/apache/flink/tests/util/kafka/LocalStandaloneKafkaResource.java @@ -278,7 +278,7 @@ public class LocalStandaloneKafkaResource implements KafkaResource { .setStdoutProcessor(messages::add) .runNonBlocking()) { - final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(30)); + final Deadline deadline = Deadline.fromNow(Duration.ofSeconds(120)); while (deadline.hasTimeLeft() && messages.size() < expectedNumMessages) { try { LOG.info("Waiting for messages. Received {}/{}.", messages.size(),
[flink] branch master updated (5834373 -> 971e7b2)
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 5834373 [FLINK-18048] Fix --host option for standalone application cluster add c389ad0 [FLINK-17260] Make number of expected messages explicit in StreamingKafkaITCase add 455fb24 [FLINK-17260] Increase timeout for reading Kafka messages in StreamingKafkaITCase add ce7b41e [FLINK-17260] Make topic names unique in StreamingKafkaITCase to prevent clashes add f6bbff4 [FLINK-18020] Make topic names unique in SQLClientKafkaITCase to prevent clashes add 971e7b2 [FLINK-18020] Increase timeout in SQLClientKafkaITCase No new revisions were added by this update. Summary of changes: .../apache/flink/tests/util/kafka/KafkaResource.java| 7 --- .../tests/util/kafka/LocalStandaloneKafkaResource.java | 17 +++-- .../flink/tests/util/kafka/SQLClientKafkaITCase.java| 17 +++-- .../flink/tests/util/kafka/StreamingKafkaITCase.java| 8 ++-- 4 files changed, 32 insertions(+), 17 deletions(-)
[flink] branch master updated (5834373 -> 971e7b2)
This is an automated email from the ASF dual-hosted git repository. aljoscha pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 5834373 [FLINK-18048] Fix --host option for standalone application cluster add c389ad0 [FLINK-17260] Make number of expected messages explicit in StreamingKafkaITCase add 455fb24 [FLINK-17260] Increase timeout for reading Kafka messages in StreamingKafkaITCase add ce7b41e [FLINK-17260] Make topic names unique in StreamingKafkaITCase to prevent clashes add f6bbff4 [FLINK-18020] Make topic names unique in SQLClientKafkaITCase to prevent clashes add 971e7b2 [FLINK-18020] Increase timeout in SQLClientKafkaITCase No new revisions were added by this update. Summary of changes: .../apache/flink/tests/util/kafka/KafkaResource.java| 7 --- .../tests/util/kafka/LocalStandaloneKafkaResource.java | 17 +++-- .../flink/tests/util/kafka/SQLClientKafkaITCase.java| 17 +++-- .../flink/tests/util/kafka/StreamingKafkaITCase.java| 8 ++-- 4 files changed, 32 insertions(+), 17 deletions(-)
[flink] branch release-1.10 updated: [FLINK-18048] Fix --host option for standalone job cluster
This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.10 by this push: new 855f799 [FLINK-18048] Fix --host option for standalone job cluster 855f799 is described below commit 855f799d9919348619f17cfcb4d6544dc02afdfb Author: wangyang0918 AuthorDate: Mon Jun 1 19:40:14 2020 +0800 [FLINK-18048] Fix --host option for standalone job cluster This closes #12495. --- .../StandaloneJobClusterConfigurationParserFactory.java | 1 + .../StandaloneJobClusterConfigurationParserFactoryTest.java | 8 2 files changed, 9 insertions(+) diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java index a99e277..ea03313 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactory.java @@ -68,6 +68,7 @@ public class StandaloneJobClusterConfigurationParserFactory implements ParserRes options.addOption(JOB_CLASS_NAME_OPTION); options.addOption(JOB_ID_OPTION); options.addOption(DYNAMIC_PROPERTY_OPTION); + options.addOption(HOST_OPTION); options.addOption(CliFrontendParser.SAVEPOINT_PATH_OPTION); options.addOption(CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION); diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java index 4a72d3a..ad1cdb5 100644 --- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneJobClusterConfigurationParserFactoryTest.java @@ -178,4 +178,12 @@ public class StandaloneJobClusterConfigurationParserFactoryTest extends TestLogg assertThat(savepointRestoreSettings.allowNonRestoredState(), is(true)); } + @Test + public void testHostOption() throws FlinkParseException { + final String hostName = "user-specified-hostname"; + final String[] args = {"--configDir", confDirPath, "--job-classname", "foobar", "--host", hostName}; + final StandaloneJobClusterConfiguration applicationClusterConfiguration = commandLineParser.parse(args); + assertThat(applicationClusterConfiguration.getHostname(), is(hostName)); + } + }
[flink] branch release-1.11 updated: [FLINK-18048] Fix --host option for standalone application cluster
This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new 39e3d6a [FLINK-18048] Fix --host option for standalone application cluster 39e3d6a is described below commit 39e3d6ad6ce25bc94a9515362a96c229f3d0c590 Author: wangyang0918 AuthorDate: Mon Jun 1 17:30:12 2020 +0800 [FLINK-18048] Fix --host option for standalone application cluster This closes #12426. --- .../StandaloneApplicationClusterConfigurationParserFactory.java | 1 + ...tandaloneApplicationClusterConfigurationParserFactoryTest.java | 8 2 files changed, 9 insertions(+) diff --git a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java index cc120c0..6c3bfb3 100644 --- a/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java +++ b/flink-container/src/main/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactory.java @@ -68,6 +68,7 @@ public class StandaloneApplicationClusterConfigurationParserFactory implements P options.addOption(JOB_CLASS_NAME_OPTION); options.addOption(JOB_ID_OPTION); options.addOption(DYNAMIC_PROPERTY_OPTION); + options.addOption(HOST_OPTION); options.addOption(CliFrontendParser.SAVEPOINT_PATH_OPTION); options.addOption(CliFrontendParser.SAVEPOINT_ALLOW_NON_RESTORED_OPTION); diff --git a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactoryTest.java b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactoryTest.java index a12de88..9c7e94c 100644 --- a/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactoryTest.java +++ b/flink-container/src/test/java/org/apache/flink/container/entrypoint/StandaloneApplicationClusterConfigurationParserFactoryTest.java @@ -233,4 +233,12 @@ public class StandaloneApplicationClusterConfigurationParserFactoryTest extends assertThat(savepointRestoreSettings.allowNonRestoredState(), is(true)); } + @Test + public void testHostOption() throws FlinkParseException { + final String hostName = "user-specified-hostname"; + final String[] args = {"--configDir", confDirPath, "--job-classname", "foobar", "--host", hostName}; + final StandaloneApplicationClusterConfiguration applicationClusterConfiguration = commandLineParser.parse(args); + assertThat(applicationClusterConfiguration.getHostname(), is(hostName)); + } + }
[flink] branch master updated (61e6f70 -> 5834373)
This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 61e6f70 [FLINK-18076][table sql / client] Use correct classloader when parsing queries add 5834373 [FLINK-18048] Fix --host option for standalone application cluster No new revisions were added by this update. Summary of changes: .../StandaloneApplicationClusterConfigurationParserFactory.java | 1 + ...tandaloneApplicationClusterConfigurationParserFactoryTest.java | 8 2 files changed, 9 insertions(+)
[flink] branch master updated (b39ef6c -> 61e6f70)
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from b39ef6c [FLINK-15339][table][docs] Correct the terminology of "Time-windowed Join" to "Interval Join" in Table API & SQL add 61e6f70 [FLINK-18076][table sql / client] Use correct classloader when parsing queries No new revisions were added by this update. Summary of changes: .../table/client/gateway/local/LocalExecutor.java | 15 ++- .../table/client/gateway/local/DependencyTest.java | 50 -- 2 files changed, 51 insertions(+), 14 deletions(-)
[flink] branch release-1.11 updated: [FLINK-18076][table sql / client] Use correct classloader when parsing queries
This is an automated email from the ASF dual-hosted git repository. dwysakowicz pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new e7902bb [FLINK-18076][table sql / client] Use correct classloader when parsing queries e7902bb is described below commit e7902bb4d1329833870ee53c782c6431cfc8cb80 Author: Leonard Xu AuthorDate: Thu Jun 4 23:53:07 2020 +0800 [FLINK-18076][table sql / client] Use correct classloader when parsing queries This closes #12475 --- .../table/client/gateway/local/LocalExecutor.java | 15 ++- .../table/client/gateway/local/DependencyTest.java | 50 -- 2 files changed, 51 insertions(+), 14 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java index fd78712..604734e 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java @@ -41,6 +41,7 @@ import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.TableResult; import org.apache.flink.table.api.TableSchema; import org.apache.flink.table.api.internal.TableEnvironmentInternal; +import org.apache.flink.table.catalog.UnresolvedIdentifier; import org.apache.flink.table.catalog.exceptions.CatalogException; import org.apache.flink.table.client.SqlClientException; import org.apache.flink.table.client.config.Environment; @@ -56,6 +57,7 @@ import org.apache.flink.table.client.gateway.local.result.ChangelogResult; import org.apache.flink.table.client.gateway.local.result.DynamicResult; import org.apache.flink.table.client.gateway.local.result.MaterializedResult; import org.apache.flink.table.delegation.Parser; +import org.apache.flink.table.operations.Operation; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.logical.utils.LogicalTypeUtils; import org.apache.flink.table.types.utils.DataTypeUtils; @@ -460,7 +462,18 @@ public class LocalExecutor implements Executor { public Parser getSqlParser(String sessionId) { final ExecutionContext context = getExecutionContext(sessionId); final TableEnvironment tableEnv = context.getTableEnvironment(); - return ((TableEnvironmentInternal) tableEnv).getParser(); + final Parser parser = ((TableEnvironmentInternal) tableEnv).getParser(); + return new Parser() { + @Override + public List parse(String statement) { + return context.wrapClassLoader(() -> parser.parse(statement)); + } + + @Override + public UnresolvedIdentifier parseIdentifier(String identifier) { + return context.wrapClassLoader(() -> parser.parseIdentifier(identifier)); + } + }; } @Override diff --git a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java index 33633f3..87f750d 100644 --- a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java +++ b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/DependencyTest.java @@ -44,10 +44,12 @@ import org.apache.flink.table.client.gateway.SessionContext; import org.apache.flink.table.client.gateway.utils.EnvironmentFileUtil; import org.apache.flink.table.client.gateway.utils.TestTableSinkFactoryBase; import org.apache.flink.table.client.gateway.utils.TestTableSourceFactoryBase; +import org.apache.flink.table.delegation.Parser; import org.apache.flink.table.descriptors.DescriptorProperties; import org.apache.flink.table.factories.CatalogFactory; import org.apache.flink.table.factories.ModuleFactory; import org.apache.flink.table.module.Module; +import org.apache.flink.table.operations.Operation; import org.apache.flink.table.types.DataType; import org.junit.Test; @@ -65,6 +67,7 @@ import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATA import static org.apache.flink.table.descriptors.CatalogDescriptorValidator.CATALOG_TYPE; import static org.apache.flink.table.descriptors.ModuleDescriptorValidator.MODULE_TYPE; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; /** * Dependency tests for {@link LocalExecutor}. Mainly for testing classloading of dependencies. @@
[flink] branch release-1.11 updated (ec65e38 -> ce63fd1)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git. from ec65e38 [hotfix][docs] Remove HADOOP_CONF_DIR add ce63fd1 [FLINK-15339][table][docs] Correct the terminology of "Time-windowed Join" to "Interval Join" in Table API & SQL No new revisions were added by this update. Summary of changes: docs/dev/table/sql/queries.md | 14 +-- docs/dev/table/sql/queries.zh.md | 14 +-- docs/dev/table/streaming/joins.md | 8 +++--- docs/dev/table/streaming/joins.zh.md | 8 +++--- docs/dev/table/streaming/match_recognize.md| 4 +-- docs/dev/table/streaming/match_recognize.zh.md | 4 +-- docs/dev/table/tableApi.md | 18 +++--- docs/dev/table/tableApi.zh.md | 18 +++--- .../plan/metadata/FlinkRelMdColumnUniqueness.scala | 2 +- .../metadata/FlinkRelMdModifiedMonotonicity.scala | 2 +- .../plan/metadata/FlinkRelMdUniqueKeys.scala | 2 +- ...ndowJoin.scala => StreamExecIntervalJoin.scala} | 29 +++--- .../FlinkChangelogModeInferenceProgram.scala | 8 +++--- .../planner/plan/rules/FlinkStreamRuleSets.scala | 2 +- ...Rule.scala => StreamExecIntervalJoinRule.scala} | 20 +++ .../rules/physical/stream/StreamExecJoinRule.scala | 6 ++--- .../stream/StreamExecTemporalJoinRule.scala| 4 +-- ...WindowJoinUtil.scala => IntervalJoinUtil.scala} | 4 +-- .../apache/flink/table/api/stream/ExplainTest.xml | 8 +++--- .../plan/stream/sql/MiniBatchIntervalInferTest.xml | 12 - .../{WindowJoinTest.xml => IntervalJoinTest.xml} | 26 +-- .../table/planner/plan/stream/table/JoinTest.xml | 20 +++ .../stream/sql/MiniBatchIntervalInferTest.scala| 2 +- ...WindowJoinTest.scala => IntervalJoinTest.scala} | 14 +-- ...owJoinITCase.scala => IntervalJoinITCase.scala} | 8 +++--- .../join/{ => interval}/EmitAwareCollector.java| 2 +- .../ProcTimeIntervalJoin.java} | 9 --- .../RowTimeIntervalJoin.java} | 9 --- .../TimeIntervalJoin.java} | 20 --- .../ProcTimeIntervalJoinTest.java} | 15 +-- .../RowTimeIntervalJoinTest.java} | 22 .../TimeIntervalStreamJoinTestBase.java} | 12 - 32 files changed, 177 insertions(+), 169 deletions(-) rename flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/{StreamExecWindowJoin.scala => StreamExecIntervalJoin.scala} (92%) rename flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/{StreamExecWindowJoinRule.scala => StreamExecIntervalJoinRule.scala} (91%) rename flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/{WindowJoinUtil.scala => IntervalJoinUtil.scala} (99%) rename flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/{WindowJoinTest.xml => IntervalJoinTest.xml} (83%) rename flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/{WindowJoinTest.scala => IntervalJoinTest.scala} (97%) rename flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/{WindowJoinITCase.scala => IntervalJoinITCase.scala} (99%) rename flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/{ => interval}/EmitAwareCollector.java (96%) rename flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/{ProcTimeBoundedStreamJoin.java => interval/ProcTimeIntervalJoin.java} (86%) rename flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/{RowTimeBoundedStreamJoin.java => interval/RowTimeIntervalJoin.java} (89%) rename flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/{TimeBoundedStreamJoin.java => interval/TimeIntervalJoin.java} (97%) rename flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/{ProcTimeBoundedStreamJoinTest.java => interval/ProcTimeIntervalJoinTest.java} (94%) rename flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/{RowTimeBoundedStreamJoinTest.java => interval/RowTimeIntervalJoinTest.java} (95%) rename flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/{TimeBoundedStreamJoinTestBase.java => interval/TimeIntervalStreamJoinTestBase.java} (88%)
[flink] branch master updated (b48d330 -> b39ef6c)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from b48d330 [hotfix][docs] Remove HADOOP_CONF_DIR add b39ef6c [FLINK-15339][table][docs] Correct the terminology of "Time-windowed Join" to "Interval Join" in Table API & SQL No new revisions were added by this update. Summary of changes: docs/dev/table/sql/queries.md | 14 +-- docs/dev/table/sql/queries.zh.md | 14 +-- docs/dev/table/streaming/joins.md | 8 +++--- docs/dev/table/streaming/joins.zh.md | 8 +++--- docs/dev/table/streaming/match_recognize.md| 4 +-- docs/dev/table/streaming/match_recognize.zh.md | 4 +-- docs/dev/table/tableApi.md | 18 +++--- docs/dev/table/tableApi.zh.md | 18 +++--- .../plan/metadata/FlinkRelMdColumnUniqueness.scala | 2 +- .../metadata/FlinkRelMdModifiedMonotonicity.scala | 2 +- .../plan/metadata/FlinkRelMdUniqueKeys.scala | 2 +- ...ndowJoin.scala => StreamExecIntervalJoin.scala} | 29 +++--- .../FlinkChangelogModeInferenceProgram.scala | 8 +++--- .../planner/plan/rules/FlinkStreamRuleSets.scala | 2 +- ...Rule.scala => StreamExecIntervalJoinRule.scala} | 20 +++ .../rules/physical/stream/StreamExecJoinRule.scala | 6 ++--- .../stream/StreamExecTemporalJoinRule.scala| 4 +-- ...WindowJoinUtil.scala => IntervalJoinUtil.scala} | 4 +-- .../apache/flink/table/api/stream/ExplainTest.xml | 8 +++--- .../plan/stream/sql/MiniBatchIntervalInferTest.xml | 12 - .../{WindowJoinTest.xml => IntervalJoinTest.xml} | 26 +-- .../table/planner/plan/stream/table/JoinTest.xml | 20 +++ .../stream/sql/MiniBatchIntervalInferTest.scala| 2 +- ...WindowJoinTest.scala => IntervalJoinTest.scala} | 14 +-- ...owJoinITCase.scala => IntervalJoinITCase.scala} | 8 +++--- .../join/{ => interval}/EmitAwareCollector.java| 2 +- .../ProcTimeIntervalJoin.java} | 9 --- .../RowTimeIntervalJoin.java} | 9 --- .../TimeIntervalJoin.java} | 20 --- .../ProcTimeIntervalJoinTest.java} | 15 +-- .../RowTimeIntervalJoinTest.java} | 22 .../TimeIntervalStreamJoinTestBase.java} | 12 - 32 files changed, 177 insertions(+), 169 deletions(-) rename flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/{StreamExecWindowJoin.scala => StreamExecIntervalJoin.scala} (92%) rename flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/rules/physical/stream/{StreamExecWindowJoinRule.scala => StreamExecIntervalJoinRule.scala} (91%) rename flink-table/flink-table-planner-blink/src/main/scala/org/apache/flink/table/planner/plan/utils/{WindowJoinUtil.scala => IntervalJoinUtil.scala} (99%) rename flink-table/flink-table-planner-blink/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/join/{WindowJoinTest.xml => IntervalJoinTest.xml} (83%) rename flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/join/{WindowJoinTest.scala => IntervalJoinTest.scala} (97%) rename flink-table/flink-table-planner-blink/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/{WindowJoinITCase.scala => IntervalJoinITCase.scala} (99%) rename flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/{ => interval}/EmitAwareCollector.java (96%) rename flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/{ProcTimeBoundedStreamJoin.java => interval/ProcTimeIntervalJoin.java} (86%) rename flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/{RowTimeBoundedStreamJoin.java => interval/RowTimeIntervalJoin.java} (89%) rename flink-table/flink-table-runtime-blink/src/main/java/org/apache/flink/table/runtime/operators/join/{TimeBoundedStreamJoin.java => interval/TimeIntervalJoin.java} (97%) rename flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/{ProcTimeBoundedStreamJoinTest.java => interval/ProcTimeIntervalJoinTest.java} (94%) rename flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/{RowTimeBoundedStreamJoinTest.java => interval/RowTimeIntervalJoinTest.java} (95%) rename flink-table/flink-table-runtime-blink/src/test/java/org/apache/flink/table/runtime/operators/join/{TimeBoundedStreamJoinTestBase.java => interval/TimeIntervalStreamJoinTestBase.java} (88%)
[flink] branch release-1.11 updated: [hotfix][docs] Remove HADOOP_CONF_DIR
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new ec65e38 [hotfix][docs] Remove HADOOP_CONF_DIR ec65e38 is described below commit ec65e38159cab82beb869a1f169cce977a12d159 Author: Robert Metzger AuthorDate: Thu Jun 4 21:13:12 2020 +0200 [hotfix][docs] Remove HADOOP_CONF_DIR The Hadoop configuration page was quite confusing because it suggested that the user has to set the HADOOP_CONF_DIR as well. All jars and configs can and should be passed through HADOOP_CLASSPATH. Period. This closes #12491 --- docs/ops/deployment/hadoop.md| 21 + docs/ops/deployment/hadoop.zh.md | 21 + 2 files changed, 10 insertions(+), 32 deletions(-) diff --git a/docs/ops/deployment/hadoop.md b/docs/ops/deployment/hadoop.md index 71914a6..7b5e5bc 100644 --- a/docs/ops/deployment/hadoop.md +++ b/docs/ops/deployment/hadoop.md @@ -32,7 +32,7 @@ under the License. In order to use Hadoop features (e.g., YARN, HDFS) it is necessary to provide Flink with the required Hadoop classes, as these are not bundled by default. -This can be done by adding the Hadoop classpath to Flink through the `HADOOP_CLASSPATH` environment variable. +The recommended approach is adding the Hadoop classpath to Flink through the `HADOOP_CLASSPATH` environment variable. Flink will use the environment variable `HADOOP_CLASSPATH` to augment the classpath that is used when starting Flink components such as the Client, @@ -44,27 +44,16 @@ that are running Flink components. When running on YARN, this is usually not a problem because the components running inside YARN will be started with the Hadoop classpaths, but it can happen that the Hadoop dependencies must be in the classpath when submitting a -job to YARN. For this, it's usually enough to do a +job to YARN. For this, it's usually enough to run {% highlight bash %} export HADOOP_CLASSPATH=`hadoop classpath` {% endhighlight %} -in the shell. Note that `hadoop` is the hadoop binary and that `classpath` is an argument that will make it print the configured Hadoop classpath. +in the shell. Note that `hadoop` is the hadoop binary and that `classpath` is an argument that will make it print the configured Hadoop classpath. The classpath returned by `hadoop classpath` also includes the Hadoop configuration directories. -Putting the Hadoop configuration in the same class path as the Hadoop libraries makes Flink pick up that configuration. - -## Referencing a Hadoop configuration - -You can reference a Hadoop configuration by setting the environment variable `HADOOP_CONF_DIR`. - -```sh -HADOOP_CONF_DIR=/path/to/etc/hadoop -``` - -Referencing the HDFS configuration in the [Flink configuration]({{ site.baseurl }}/ops/config.html#hdfs) is deprecated. - -Another way to provide the Hadoop configuration is to have it on the class path of the Flink process, see more details above. +If you are manually assembling the `HADOOP_CLASSPATH` variable, we recommend +adding the Hadoop configuration directories as well. ## Running a job locally diff --git a/docs/ops/deployment/hadoop.zh.md b/docs/ops/deployment/hadoop.zh.md index f9cf3e1..c73a000 100644 --- a/docs/ops/deployment/hadoop.zh.md +++ b/docs/ops/deployment/hadoop.zh.md @@ -32,7 +32,7 @@ under the License. In order to use Hadoop features (e.g., YARN, HDFS) it is necessary to provide Flink with the required Hadoop classes, as these are not bundled by default. -This can be done by adding the Hadoop classpath to Flink through the `HADOOP_CLASSPATH` environment variable. +The recommended approach is adding the Hadoop classpath to Flink through the `HADOOP_CLASSPATH` environment variable. Flink will use the environment variable `HADOOP_CLASSPATH` to augment the classpath that is used when starting Flink components such as the Client, @@ -44,27 +44,16 @@ that are running Flink components. When running on YARN, this is usually not a problem because the components running inside YARN will be started with the Hadoop classpaths, but it can happen that the Hadoop dependencies must be in the classpath when submitting a -job to YARN. For this, it's usually enough to do a +job to YARN. For this, it's usually enough to run {% highlight bash %} export HADOOP_CLASSPATH=`hadoop classpath` {% endhighlight %} -in the shell. Note that `hadoop` is the hadoop binary and that `classpath` is an argument that will make it print the configured Hadoop classpath. +in the shell. Note that `hadoop` is the hadoop binary and that `classpath` is an argument that will make it print the configured Hadoop classpath. The classpath returned by `hadoop classpath` also includes the Hadoop configuration directories. -Putting the Hadoop configuration in the same class path
[flink] branch master updated: [hotfix][docs] Remove HADOOP_CONF_DIR
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new b48d330 [hotfix][docs] Remove HADOOP_CONF_DIR b48d330 is described below commit b48d330a9f09b1a30077dc7666dd95d942013765 Author: Robert Metzger AuthorDate: Thu Jun 4 21:13:12 2020 +0200 [hotfix][docs] Remove HADOOP_CONF_DIR The Hadoop configuration page was quite confusing because it suggested that the user has to set the HADOOP_CONF_DIR as well. All jars and configs can and should be passed through HADOOP_CLASSPATH. Period. This closes #12491 --- docs/ops/deployment/hadoop.md| 21 + docs/ops/deployment/hadoop.zh.md | 21 + 2 files changed, 10 insertions(+), 32 deletions(-) diff --git a/docs/ops/deployment/hadoop.md b/docs/ops/deployment/hadoop.md index 71914a6..7b5e5bc 100644 --- a/docs/ops/deployment/hadoop.md +++ b/docs/ops/deployment/hadoop.md @@ -32,7 +32,7 @@ under the License. In order to use Hadoop features (e.g., YARN, HDFS) it is necessary to provide Flink with the required Hadoop classes, as these are not bundled by default. -This can be done by adding the Hadoop classpath to Flink through the `HADOOP_CLASSPATH` environment variable. +The recommended approach is adding the Hadoop classpath to Flink through the `HADOOP_CLASSPATH` environment variable. Flink will use the environment variable `HADOOP_CLASSPATH` to augment the classpath that is used when starting Flink components such as the Client, @@ -44,27 +44,16 @@ that are running Flink components. When running on YARN, this is usually not a problem because the components running inside YARN will be started with the Hadoop classpaths, but it can happen that the Hadoop dependencies must be in the classpath when submitting a -job to YARN. For this, it's usually enough to do a +job to YARN. For this, it's usually enough to run {% highlight bash %} export HADOOP_CLASSPATH=`hadoop classpath` {% endhighlight %} -in the shell. Note that `hadoop` is the hadoop binary and that `classpath` is an argument that will make it print the configured Hadoop classpath. +in the shell. Note that `hadoop` is the hadoop binary and that `classpath` is an argument that will make it print the configured Hadoop classpath. The classpath returned by `hadoop classpath` also includes the Hadoop configuration directories. -Putting the Hadoop configuration in the same class path as the Hadoop libraries makes Flink pick up that configuration. - -## Referencing a Hadoop configuration - -You can reference a Hadoop configuration by setting the environment variable `HADOOP_CONF_DIR`. - -```sh -HADOOP_CONF_DIR=/path/to/etc/hadoop -``` - -Referencing the HDFS configuration in the [Flink configuration]({{ site.baseurl }}/ops/config.html#hdfs) is deprecated. - -Another way to provide the Hadoop configuration is to have it on the class path of the Flink process, see more details above. +If you are manually assembling the `HADOOP_CLASSPATH` variable, we recommend +adding the Hadoop configuration directories as well. ## Running a job locally diff --git a/docs/ops/deployment/hadoop.zh.md b/docs/ops/deployment/hadoop.zh.md index f9cf3e1..c73a000 100644 --- a/docs/ops/deployment/hadoop.zh.md +++ b/docs/ops/deployment/hadoop.zh.md @@ -32,7 +32,7 @@ under the License. In order to use Hadoop features (e.g., YARN, HDFS) it is necessary to provide Flink with the required Hadoop classes, as these are not bundled by default. -This can be done by adding the Hadoop classpath to Flink through the `HADOOP_CLASSPATH` environment variable. +The recommended approach is adding the Hadoop classpath to Flink through the `HADOOP_CLASSPATH` environment variable. Flink will use the environment variable `HADOOP_CLASSPATH` to augment the classpath that is used when starting Flink components such as the Client, @@ -44,27 +44,16 @@ that are running Flink components. When running on YARN, this is usually not a problem because the components running inside YARN will be started with the Hadoop classpaths, but it can happen that the Hadoop dependencies must be in the classpath when submitting a -job to YARN. For this, it's usually enough to do a +job to YARN. For this, it's usually enough to run {% highlight bash %} export HADOOP_CLASSPATH=`hadoop classpath` {% endhighlight %} -in the shell. Note that `hadoop` is the hadoop binary and that `classpath` is an argument that will make it print the configured Hadoop classpath. +in the shell. Note that `hadoop` is the hadoop binary and that `classpath` is an argument that will make it print the configured Hadoop classpath. The classpath returned by `hadoop classpath` also includes the Hadoop configuration directories. -Putting the Hadoop configuration in the same class path as the
[flink] branch release-1.11 updated: [FLINK-17974][docs] Extend docker documentation
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new b20ea34 [FLINK-17974][docs] Extend docker documentation b20ea34 is described below commit b20ea34d3c10254a82e46875419cd98d2986085e Author: Robert Metzger AuthorDate: Thu Jun 4 14:41:53 2020 +0200 [FLINK-17974][docs] Extend docker documentation This closes #12490 --- docs/ops/deployment/docker.md| 59 +- docs/ops/deployment/docker.zh.md | 61 ++-- 2 files changed, 91 insertions(+), 29 deletions(-) diff --git a/docs/ops/deployment/docker.md b/docs/ops/deployment/docker.md index a1775c3..2525185 100644 --- a/docs/ops/deployment/docker.md +++ b/docs/ops/deployment/docker.md @@ -46,12 +46,12 @@ For example, you can use the following aliases: * `flink:latest` → `flink:-scala_` * `flink:1.11` → `flink:1.11.-scala_2.11` -Note Prio to Flink 1.5 version, Hadoop dependencies were always bundled with Flink. +Note Prior to Flink 1.5 version, Hadoop dependencies were always bundled with Flink. You can see that certain tags include the version of Hadoop, e.g. (e.g. `-hadoop28`). Beginning with Flink 1.5, image tags that omit the Hadoop version correspond to Hadoop-free releases of Flink that do not include a bundled Hadoop distribution. -## How to run Flink image +## How to run a Flink image The Flink image contains a regular Flink distribution with its default configuration and a standard entry point script. You can run its entry point in the following modes: @@ -70,21 +70,43 @@ and deploys *TaskManagers* on demand so that you do not have to do it manually. The next chapters describe how to start a single Flink Docker container for various purposes. +Once you've started Flink on Docker, you can access the Flink Webfrontend on [localhost:8081](http://localhost:8081/#/overview) or submit jobs like this `./bin/flink run ./examples/streaming/TopSpeedWindowing.jar`. + +We recommend using [Docker Compose](docker.html#session-cluster-with-docker-compose) or [Docker Swarm](docker.html#session-cluster-with-docker-swarm) for deploying Flink as a Session Cluster to ease system configuration. + ### Start a Session Cluster A *Flink Session cluster* can be used to run multiple jobs. Each job needs to be submitted to the cluster after it has been deployed. -To deploy a *Flink Session cluster* with Docker, you need to start a *Flink Master* container: +To deploy a *Flink Session cluster* with Docker, you need to start a *Flink Master* container. To enable communication between the containers, we first set a required Flink configuration property and create a network: +```sh +FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" +docker network create flink-network +``` + +Then we launch the JobManager: ```sh -docker run flink:{% if site.is_stable %}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest{% endif %} jobmanager +docker run \ +--rm \ +--name=jobmanager \ +--network flink-network \ +-p 8081:8081 \ +--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \ +flink:{% if site.is_stable %}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest{% endif %} jobmanager ``` and one or more *TaskManager* containers: ```sh -docker run flink:{% if site.is_stable %}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest{% endif %} taskmanager +docker run \ +--rm \ +--name=taskmanager \ +--network flink-network \ +--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \ +flink:{% if site.is_stable %}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest{% endif %} taskmanager ``` + ### Start a Job Cluster A *Flink Job cluster* is a dedicated cluster which runs a single job. @@ -105,18 +127,26 @@ To make the **job artifacts available** locally in the container, you can the *Flink Master* and *TaskManagers*: ```sh +FLINK_PROPERTIES="jobmanager.rpc.address: jobmanager" +docker network create flink-network + docker run \ --mount type=bind,src=/host/path/to/job/artifacts1,target=/opt/flink/usrlib/artifacts1 \ --mount type=bind,src=/host/path/to/job/artifacts2,target=/opt/flink/usrlib/artifacts2 \ +--rm \ +--env FLINK_PROPERTIES="${FLINK_PROPERTIES}" \ +--name=jobmanager \ +--network flink-network \ flink:{% if site.is_stable %}{{site.version}}-scala{{site.scala_version_suffix}}{% else %}latest{% endif %} standalone-job \ --job-classname com.job.ClassName \ ---job-id \ +[--job-id ] \ [--fromSavepoint /path/to/savepoint [--allowNonRestoredState]] \ [job arguments] docker run \ --mount
[flink] branch master updated (d991b1a -> 13bae63)
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from d991b1a [FLINK-17816][metrics] Schedule latency markers with fixed delay add 13bae63 [FLINK-17974][docs] Extend docker documentation No new revisions were added by this update. Summary of changes: docs/ops/deployment/docker.md| 59 +- docs/ops/deployment/docker.zh.md | 61 ++-- 2 files changed, 91 insertions(+), 29 deletions(-)
[flink-benchmarks] branch master updated: [FLINK-17287][github] Disable merge commit button
This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git The following commit(s) were added to refs/heads/master by this push: new 8b44986 [FLINK-17287][github] Disable merge commit button 8b44986 is described below commit 8b449865cf733dbb3c01e997fe44b1a5b6f82cdc Author: Nico Kruber AuthorDate: Fri Jun 5 11:46:38 2020 +0200 [FLINK-17287][github] Disable merge commit button --- .asf.yaml | 5 + 1 file changed, 5 insertions(+) diff --git a/.asf.yaml b/.asf.yaml new file mode 100644 index 000..930c896 --- /dev/null +++ b/.asf.yaml @@ -0,0 +1,5 @@ +github: + enabled_merge_buttons: +squash: true +merge: false +rebase: true
[flink-benchmarks] branch master updated (c1db02d -> a37619d)
This is an automated email from the ASF dual-hosted git repository. nkruber pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink-benchmarks.git. from c1db02d [init] Add ASL 2.0 License new e16896a Add skeleton for micro benchmarks twith first benchmark new 0134b6a Add KeyByBenchmark new 72afb9b Add TimeWindowBenchmark new a68dca7 Dump results to csv file new 0016349 Add benchmarks for different state backends new 469188b make KeyByBenchmark#SumReduce a static inner class new e4e9024 Extract user functions new a7947c4 Improve codespeed new 0c33a3e Extract StateBackendBenchmark new c03925f Reduce number of parameters new 90ab919 Bump number of iterations to production ready values new e1267b6 Remove fork value from pom file new 2953086 Change units in benchmarks new 35aeff7 Extract common logic for IntLong benchmarks new d42afaf Benchmark state using tumbling windows new df6a089 Use different number of operations per invocation in memory and rocks benchmarks new 621f73e Make all benchmark inherite from BenchmarkBase new 8fd1ae4 Extract BenchmarkBase for non integration Flink benchmarks new b662f8e Clean up pom.xml new ca39899 Update Flink to 1.5-SNAPSHOT new 58687db Run NetworkBenchmarks from flink new aca8216 Ignore empty parameters in benchmark name generation new 313bfd7 add benchmarks for the keyBy() operation on tuples and arrays new 4a453df Remove incorrect comment from pom new 062f96b Add README.md new 9e6eee1 [hotfix] make KeyByBenchmarks#IncreasingBaseSource more generic, creating BaseSourceWithKeyRange new 8cd86ae add SerializationFrameworkBenchmarks for Pojo, Tuple, Kryo, and Avro new 426ac4c Update default JVM options and bump default Flink version to 1.7 new a1d2ebe Add network 1000,1ms benchmark new a8bc679 cleanup pom.xml (#8) new da30375 Add broadcast network 100,100ms benchmark new 9a95e89 clarify licensing situation with respect to jmh new 84532bd add SSL network benchmarks and change overall benchmark set new 2ddbb7a Add general remarks to the README new 21b97fe [FLINK-11986] [state backend, tests] Add micro benchmark for state operations new 8b71d9c Add support to generate shaded benchmark package to allow run specific case in command line new 5d7aec8 [hotfix] Missing quotes in example maven command in README new 97ffd65 [hotfix] Exclude state benchmark from the Flink suit new c25d921 [hotfix] Fix NoClassDefFoundError when running network related benchmarks new 136c166 Run all benchmarks once on Travis CI new ca2d06b Use maven profile to differentiate travis and jenkins run new cd16e99 Update the dependent flink version to 1.9-SNAPSHOT new 4372ba0 Add InputBenchmark new 70fb9bd Explain naming convention in the README new 9c338a9 Add couple of details to the README and a note how to pick length of a new benchmark new b2c1aa2 Add TwoInputBenchmark new 75d7b6e [FLINK-11877] Add an input idle benchmark for two input operators new caf4917 [hotfix] Fix typo in static variable new 3b55f59 [hotfix] Fix formatting of pom.xml new 9e3833a [hotfix] Add missing dependencies new ab0015d [FLINK-12818] Improve stability of twoInputMapSink benchmark new a4ae3ae [hotfix] adapt StreamNetworkThroughputBenchmarkExecutor to latest changes in Flink new c2f6793 Delete root directory when teardown the benchmark new 15064ae Let list state benchmark stable new d47b530 Update flink-tests artifact id new 6ba88de Adding benchmark for AsyncWaitOperator new 707905f [FLINK-14118] Add throughput benchmark executor for data skew scenario. (#31) new 46b7ce9 [FLINK-14118] Bump number of writer threads in network data skew Benchmark to 10 new 735bd06 [hotfix] Update flink version from 1.9 to 1.10. new 91d5e0b [hotfix] Increase max memory of the test jvm to 6g. new fd337c6 [FLINK-14783] Add ContinuousFileReaderOperator benchmark (#34) new cb73ac5 [FLINK-13846] Add benchmark of mapstate isEmpty new 4625672 [FLINK-14346] [serialization] Add string-heavy version of SerializationFrameworkMiniBenchmark new cd6f0a3 [hotfix] Make StateBackendContext extends FlinkEnvironmentContext for deduplication. new f54e93b [FLINK-15103] Set number of network buffers to be the same as before flip49 for all benchmarks using local executor. new 322d34d [hotfix] Bump default Flink version to 1.11-SNAPSHOT new bbac57c [FLINK-15199] Fix compile error after FLINK-14926 new e64a0e0 [FLINK-15070] Add benchmarks for blocking partition new a63a4bc fixup! [FLINK-15103] Set number of network buffers to be the same as before flip49 for all benchmarks using local executor. new cfcbb96
[flink] branch release-1.11 updated (3fe471f -> 56e9f84)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git. from 3fe471f [FLINK-17553][table] Fix plan error when constant exists in group window key add 56e9f84 [FLINK-18042][build] Bump flink-shaded to 11.0 No new revisions were added by this update. Summary of changes: .../main/java/org/apache/flink/test/util/SecureTestEnvironment.java | 4 ++-- pom.xml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-)
[flink] branch master updated (826a075 -> d991b1a)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 826a075 [FLINK-18042][build] Bump flink-shaded to 11.0 add d991b1a [FLINK-17816][metrics] Schedule latency markers with fixed delay No new revisions were added by this update. Summary of changes: .../java/org/apache/flink/streaming/api/operators/StreamSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-)
[flink] branch master updated (c0673a2 -> 826a075)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from c0673a2 [FLINK-18038][statebackend] Log application-defined state backends after configuration add 826a075 [FLINK-18042][build] Bump flink-shaded to 11.0 No new revisions were added by this update. Summary of changes: .../main/java/org/apache/flink/test/util/SecureTestEnvironment.java | 4 ++-- pom.xml | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-)
[flink] branch master updated (c67109e -> c0673a2)
This is an automated email from the ASF dual-hosted git repository. chesnay pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from c67109e [FLINK-17553][table] Fix plan error when constant exists in group window key add c0673a2 [FLINK-18038][statebackend] Log application-defined state backends after configuration No new revisions were added by this update. Summary of changes: .../org/apache/flink/runtime/state/StateBackendLoader.java | 10 +- 1 file changed, 5 insertions(+), 5 deletions(-)
[flink] branch release-1.11 updated (78b3f48 -> 3fe471f)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git. from 78b3f48 [FLINK-18142][hive] Wrong state names in HiveContinuousMonitoringFunction add 3fe471f [FLINK-17553][table] Fix plan error when constant exists in group window key No new revisions were added by this update. Summary of changes: .../planner/plan/rules/FlinkStreamRuleSets.scala | 4 +++ .../table/planner/plan/stream/sql/RankTest.xml | 18 ++ .../plan/stream/sql/agg/WindowAggregateTest.xml| 29 ++ .../plan/stream/sql/agg/WindowAggregateTest.scala | 14 +++ .../runtime/stream/sql/WindowAggregateITCase.scala | 28 + 5 files changed, 83 insertions(+), 10 deletions(-)
[flink] branch master updated (46d42e5 -> c67109e)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from 46d42e5 [FLINK-18069][CI] Test if Scala/JavaDocs builds are passing in the compile stage add c67109e [FLINK-17553][table] Fix plan error when constant exists in group window key No new revisions were added by this update. Summary of changes: .../planner/plan/rules/FlinkStreamRuleSets.scala | 4 +++ .../table/planner/plan/stream/sql/RankTest.xml | 18 ++ .../plan/stream/sql/agg/WindowAggregateTest.xml| 29 ++ .../plan/stream/sql/agg/WindowAggregateTest.scala | 14 +++ .../runtime/stream/sql/WindowAggregateITCase.scala | 28 + 5 files changed, 83 insertions(+), 10 deletions(-)
[flink] branch master updated: [FLINK-18069][CI] Test if Scala/JavaDocs builds are passing in the compile stage
This is an automated email from the ASF dual-hosted git repository. rmetzger pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/master by this push: new 46d42e5 [FLINK-18069][CI] Test if Scala/JavaDocs builds are passing in the compile stage 46d42e5 is described below commit 46d42e5b718ea2cd914dc7feaf4425e04c3e4645 Author: Robert Metzger AuthorDate: Tue Jun 2 15:07:44 2020 +0200 [FLINK-18069][CI] Test if Scala/JavaDocs builds are passing in the compile stage This closes #12447 --- tools/ci/compile.sh | 65 ++--- 1 file changed, 42 insertions(+), 23 deletions(-) diff --git a/tools/ci/compile.sh b/tools/ci/compile.sh index b0b4803..00665ec 100755 --- a/tools/ci/compile.sh +++ b/tools/ci/compile.sh @@ -47,35 +47,54 @@ run_mvn clean install $MAVEN_OPTS -Dflink.convergence.phase=install -Pcheck-conv EXIT_CODE=$? -if [ $EXIT_CODE == 0 ]; then +if [ $EXIT_CODE != 0 ]; then echo "==" -echo "Checking scala suffixes" +echo "Compiling Flink failed." echo "==" +exit $EXIT_CODE +fi -${CI_DIR}/verify_scala_suffixes.sh "${PROFILE}" -EXIT_CODE=$? -else -echo "==" -echo "Previous build failure detected, skipping scala-suffixes check." -echo "==" +echo " Checking Javadocs " + +# use the same invocation as on buildbot (https://svn.apache.org/repos/infra/infrastructure/buildbot/aegis/buildmaster/master1/projects/flink.conf) +run_mvn javadoc:aggregate -Paggregate-scaladoc -DadditionalJOption='-Xdoclint:none' \ + -Dmaven.javadoc.failOnError=false -Dcheckstyle.skip=true -Denforcer.skip=true \ + -Dheader=someTestHeader > javadoc.out +EXIT_CODE=$? +if [ $EXIT_CODE != 0 ] ; then + echo "ERROR in Javadocs. Printing full output:" + cat javadoc.out ; rm javadoc.out + exit $EXIT_CODE fi -if [ $EXIT_CODE == 0 ]; then -check_shaded_artifacts -EXIT_CODE=$(($EXIT_CODE+$?)) -check_shaded_artifacts_s3_fs hadoop -EXIT_CODE=$(($EXIT_CODE+$?)) -check_shaded_artifacts_s3_fs presto -EXIT_CODE=$(($EXIT_CODE+$?)) -check_shaded_artifacts_connector_elasticsearch 5 -EXIT_CODE=$(($EXIT_CODE+$?)) -check_shaded_artifacts_connector_elasticsearch 6 -EXIT_CODE=$(($EXIT_CODE+$?)) -else -echo "==" -echo "Previous build failure detected, skipping shaded dependency check." -echo "==" +echo " Checking Scaladocs " + +cd flink-scala +run_mvn scala:doc 2> scaladoc.out +EXIT_CODE=$? +if [ $EXIT_CODE != 0 ] ; then + echo "ERROR in Scaladocs. Printing full output:" + cat scaladoc.out ; rm scaladoc.out + exit $EXIT_CODE fi +cd .. + +echo " Checking scala suffixes " + +${CI_DIR}/verify_scala_suffixes.sh "${PROFILE}" || exit $? + +echo " Checking shaded dependencies " + +check_shaded_artifacts +EXIT_CODE=$(($EXIT_CODE+$?)) +check_shaded_artifacts_s3_fs hadoop +EXIT_CODE=$(($EXIT_CODE+$?)) +check_shaded_artifacts_s3_fs presto +EXIT_CODE=$(($EXIT_CODE+$?)) +check_shaded_artifacts_connector_elasticsearch 5 +EXIT_CODE=$(($EXIT_CODE+$?)) +check_shaded_artifacts_connector_elasticsearch 6 +EXIT_CODE=$(($EXIT_CODE+$?)) exit $EXIT_CODE
[flink] branch release-1.11 updated: [FLINK-18142][hive] Wrong state names in HiveContinuousMonitoringFunction
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new 78b3f48 [FLINK-18142][hive] Wrong state names in HiveContinuousMonitoringFunction 78b3f48 is described below commit 78b3f48d5d13cb64cecccd833cc1dd18c8d520e9 Author: Jingsong Lee AuthorDate: Fri Jun 5 15:50:54 2020 +0800 [FLINK-18142][hive] Wrong state names in HiveContinuousMonitoringFunction This closes #12497 --- .../flink/connectors/hive/read/HiveContinuousMonitoringFunction.java | 4 ++-- .../java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java | 1 + 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java index cdd1e76..893e880 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/connectors/hive/read/HiveContinuousMonitoringFunction.java @@ -171,13 +171,13 @@ public class HiveContinuousMonitoringFunction public void initializeState(FunctionInitializationContext context) throws Exception { this.currReadTimeState = context.getOperatorStateStore().getListState( new ListStateDescriptor<>( - "partition-monitoring-state", + "current-read-time-state", LongSerializer.INSTANCE ) ); this.distinctPartsState = context.getOperatorStateStore().getListState( new ListStateDescriptor<>( - "partition-monitoring-state", + "distinct-partitions-state", new ListSerializer<>(new ListSerializer<>(StringSerializer.INSTANCE)) ) ); diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java index 317943a..32a5cd4 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java @@ -464,6 +464,7 @@ public class HiveTableSourceITCase extends BatchAbstractTestBase { final String dbName = "source_db"; final String tblName = "stream_test"; StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.enableCheckpointing(100); StreamTableEnvironment tEnv = HiveTestUtils.createTableEnvWithBlinkPlannerStreamMode(env, SqlDialect.HIVE); tEnv.registerCatalog(catalogName, hiveCatalog); tEnv.useCatalog(catalogName);
[flink] branch master updated (b28cca2 -> d3dbf61)
This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from b28cca2 [FLINK-18059][sql-client] Fix create/drop catalog statement can not be executed in sql client add d3dbf61 [FLINK-18142][hive] Wrong state names in HiveContinuousMonitoringFunction No new revisions were added by this update. Summary of changes: .../flink/connectors/hive/read/HiveContinuousMonitoringFunction.java | 4 ++-- .../java/org/apache/flink/connectors/hive/HiveTableSourceITCase.java | 1 + 2 files changed, 3 insertions(+), 2 deletions(-)
[flink] branch release-1.11 updated: [FLINK-18059][sql-client] Fix create/drop catalog statement can not be executed in sql client
This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git The following commit(s) were added to refs/heads/release-1.11 by this push: new 89be0b1 [FLINK-18059][sql-client] Fix create/drop catalog statement can not be executed in sql client 89be0b1 is described below commit 89be0b1c12da7a0c72f0db304479767db8f72488 Author: godfrey he AuthorDate: Fri Jun 5 15:13:09 2020 +0800 [FLINK-18059][sql-client] Fix create/drop catalog statement can not be executed in sql client This closes #12435 --- .../apache/flink/table/client/cli/CliClient.java | 111 + .../apache/flink/table/client/cli/CliStrings.java | 8 +- .../flink/table/client/cli/CliClientTest.java | 14 +++ .../flink/table/client/cli/TestingExecutor.java| 15 ++- .../table/client/cli/TestingExecutorBuilder.java | 12 ++- 5 files changed, 68 insertions(+), 92 deletions(-) diff --git a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java index ffb2d0e..b1acc76 100644 --- a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java +++ b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java @@ -313,10 +313,10 @@ public class CliClient { callInsert(cmdCall); break; case CREATE_TABLE: - callCreateTable(cmdCall); + callDdl(cmdCall.operands[0], CliStrings.MESSAGE_TABLE_CREATED); break; case DROP_TABLE: - callDropTable(cmdCall); + callDdl(cmdCall.operands[0], CliStrings.MESSAGE_TABLE_REMOVED); break; case CREATE_VIEW: callCreateView(cmdCall); @@ -325,28 +325,37 @@ public class CliClient { callDropView(cmdCall); break; case CREATE_FUNCTION: - callCreateFunction(cmdCall); + callDdl(cmdCall.operands[0], CliStrings.MESSAGE_FUNCTION_CREATED); break; case DROP_FUNCTION: - callDropFunction(cmdCall); + callDdl(cmdCall.operands[0], CliStrings.MESSAGE_FUNCTION_REMOVED); break; case ALTER_FUNCTION: - callAlterFunction(cmdCall); + callDdl(cmdCall.operands[0], CliStrings.MESSAGE_ALTER_FUNCTION_SUCCEEDED, + CliStrings.MESSAGE_ALTER_FUNCTION_FAILED); break; case SOURCE: callSource(cmdCall); break; case CREATE_DATABASE: - callCreateDatabase(cmdCall); + callDdl(cmdCall.operands[0], CliStrings.MESSAGE_DATABASE_CREATED); break; case DROP_DATABASE: - callDropDatabase(cmdCall); + callDdl(cmdCall.operands[0], CliStrings.MESSAGE_DATABASE_REMOVED); break; case ALTER_DATABASE: - callAlterDatabase(cmdCall); + callDdl(cmdCall.operands[0], CliStrings.MESSAGE_ALTER_DATABASE_SUCCEEDED, + CliStrings.MESSAGE_ALTER_DATABASE_FAILED); break; case ALTER_TABLE: - callAlterTable(cmdCall); + callDdl(cmdCall.operands[0], CliStrings.MESSAGE_ALTER_TABLE_SUCCEEDED, + CliStrings.MESSAGE_ALTER_TABLE_FAILED); + break; + case CREATE_CATALOG: + callDdl(cmdCall.operands[0], CliStrings.MESSAGE_CATALOG_CREATED); + break; + case DROP_CATALOG: + callDdl(cmdCall.operands[0], CliStrings.MESSAGE_CATALOG_REMOVED); break; default: throw new SqlClientException("Unsupported command: " + cmdCall.command); @@ -583,24 +592,6 @@ public class CliClient { return true;
[flink] branch master updated (e2cffe1 -> b28cca2)
This is an automated email from the ASF dual-hosted git repository. jark pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from e2cffe1 [FLINK-15687][runtime][test] Make TaskManagerActions access task slot table on rpc main thread in TaskSubmissionTestEnvironment. add b28cca2 [FLINK-18059][sql-client] Fix create/drop catalog statement can not be executed in sql client No new revisions were added by this update. Summary of changes: .../apache/flink/table/client/cli/CliClient.java | 111 + .../apache/flink/table/client/cli/CliStrings.java | 8 +- .../flink/table/client/cli/CliClientTest.java | 14 +++ .../flink/table/client/cli/TestingExecutor.java| 15 ++- .../table/client/cli/TestingExecutorBuilder.java | 12 ++- 5 files changed, 68 insertions(+), 92 deletions(-)
[flink] 02/02: [FLINK-15687][runtime][test] Make TaskManagerActions access task slot table on rpc main thread in TaskSubmissionTestEnvironment.
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 10b284201337750422b663e8b69b7d867825da40 Author: Xintong Song AuthorDate: Thu May 28 16:23:31 2020 +0800 [FLINK-15687][runtime][test] Make TaskManagerActions access task slot table on rpc main thread in TaskSubmissionTestEnvironment. This closes #12399. --- .../TaskSubmissionTestEnvironment.java | 25 +++--- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java index 14adabf..e79b720 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java @@ -137,20 +137,8 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { jobMasterGateway = testingJobMasterGateway; } - TaskManagerActions taskManagerActions; - if (taskManagerActionListeners.size() == 0) { - taskManagerActions = new NoOpTaskManagerActions(); - } else { - TestTaskManagerActions testTaskManagerActions = new TestTaskManagerActions(taskSlotTable, jobMasterGateway); - for (Tuple3> listenerTuple : taskManagerActionListeners) { - testTaskManagerActions.addListener(listenerTuple.f0, listenerTuple.f1, listenerTuple.f2); - } - taskManagerActions = testTaskManagerActions; - } - this.testingRpcService = testingRpcService; final DefaultJobTable jobTable = DefaultJobTable.create(); - registerJobMasterConnection(jobTable, jobId, testingRpcService, jobMasterGateway, taskManagerActions, timeout); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( false, @@ -170,6 +158,19 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { taskExecutor.waitUntilStarted(); this.threadSafeTaskSlotTable = new ThreadSafeTaskSlotTable<>(taskSlotTable, taskExecutor.getMainThreadExecutableForTesting()); + + TaskManagerActions taskManagerActions; + if (taskManagerActionListeners.size() == 0) { + taskManagerActions = new NoOpTaskManagerActions(); + } else { + TestTaskManagerActions testTaskManagerActions = new TestTaskManagerActions(threadSafeTaskSlotTable, jobMasterGateway); + for (Tuple3> listenerTuple : taskManagerActionListeners) { + testTaskManagerActions.addListener(listenerTuple.f0, listenerTuple.f1, listenerTuple.f2); + } + taskManagerActions = testTaskManagerActions; + } + + registerJobMasterConnection(jobTable, jobId, testingRpcService, jobMasterGateway, taskManagerActions, timeout); } static void registerJobMasterConnection(
[flink] branch release-1.11 updated (37f436e -> 10b2842)
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a change to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git. from 37f436e [FLINK-18143][python] Fix Python meter metric incorrect value problem (#12498) new 359ab03 [FLINK-15687][runtime][test] Fix accessing TaskSlotTable via TaskSubmissionTestEnvironment not in RPC main thread. new 10b2842 [FLINK-15687][runtime][test] Make TaskManagerActions access task slot table on rpc main thread in TaskSubmissionTestEnvironment. The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../TaskSubmissionTestEnvironment.java | 34 ++-- .../runtime/taskexecutor/TestingTaskExecutor.java | 5 + .../taskexecutor/slot/ThreadSafeTaskSlotTable.java | 206 + 3 files changed, 230 insertions(+), 15 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java
[flink] 01/02: [FLINK-15687][runtime][test] Fix accessing TaskSlotTable via TaskSubmissionTestEnvironment not in RPC main thread.
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git commit 359ab037349145b8eaadbd476d2f1a98bdab45ed Author: Xintong Song AuthorDate: Thu May 28 16:11:31 2020 +0800 [FLINK-15687][runtime][test] Fix accessing TaskSlotTable via TaskSubmissionTestEnvironment not in RPC main thread. --- .../TaskSubmissionTestEnvironment.java | 9 +- .../runtime/taskexecutor/TestingTaskExecutor.java | 5 + .../taskexecutor/slot/ThreadSafeTaskSlotTable.java | 206 + 3 files changed, 217 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java index 824e0f0..14adabf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java @@ -51,6 +51,7 @@ import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNot import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils; import org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotTable; +import org.apache.flink.runtime.taskexecutor.slot.ThreadSafeTaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions; import org.apache.flink.runtime.taskmanager.Task; @@ -91,7 +92,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { private final TestingHighAvailabilityServices haServices; private final TemporaryFolder temporaryFolder; - private final TaskSlotTable taskSlotTable; + private final ThreadSafeTaskSlotTable threadSafeTaskSlotTable; private final JobMasterId jobMasterId; private TestingTaskExecutor taskExecutor; @@ -116,7 +117,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { this.jobMasterId = jobMasterId; - this.taskSlotTable = slotSize > 0 ? + final TaskSlotTable taskSlotTable = slotSize > 0 ? TaskSlotUtils.createTaskSlotTable(slotSize) : TestingTaskSlotTable .newBuilder() @@ -167,6 +168,8 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { taskExecutor.start(); taskExecutor.waitUntilStarted(); + + this.threadSafeTaskSlotTable = new ThreadSafeTaskSlotTable<>(taskSlotTable, taskExecutor.getMainThreadExecutableForTesting()); } static void registerJobMasterConnection( @@ -198,7 +201,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { } public TaskSlotTable getTaskSlotTable() { - return taskSlotTable; + return threadSafeTaskSlotTable; } public JobMasterId getJobMasterId() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java index 7a4a01f..95b2f2d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.MainThreadExecutable; import org.apache.flink.runtime.rpc.RpcService; import javax.annotation.Nullable; @@ -80,4 +81,8 @@ class TestingTaskExecutor extends TaskExecutor { void waitUntilStarted() { startFuture.join(); } + + MainThreadExecutable getMainThreadExecutableForTesting() { + return this.rpcServer; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java new file mode 100644 index 000..0b9b668 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF
[flink] 01/02: [FLINK-15687][runtime][test] Fix accessing TaskSlotTable via TaskSubmissionTestEnvironment not in RPC main thread.
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit 84c290581694f17e08f9ba91be6eac70264b9d70 Author: Xintong Song AuthorDate: Thu May 28 16:11:31 2020 +0800 [FLINK-15687][runtime][test] Fix accessing TaskSlotTable via TaskSubmissionTestEnvironment not in RPC main thread. --- .../TaskSubmissionTestEnvironment.java | 9 +- .../runtime/taskexecutor/TestingTaskExecutor.java | 5 + .../taskexecutor/slot/ThreadSafeTaskSlotTable.java | 206 + 3 files changed, 217 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java index 824e0f0..14adabf 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java @@ -51,6 +51,7 @@ import org.apache.flink.runtime.taskexecutor.rpc.RpcResultPartitionConsumableNot import org.apache.flink.runtime.taskexecutor.slot.TaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TaskSlotUtils; import org.apache.flink.runtime.taskexecutor.slot.TestingTaskSlotTable; +import org.apache.flink.runtime.taskexecutor.slot.ThreadSafeTaskSlotTable; import org.apache.flink.runtime.taskexecutor.slot.TimerService; import org.apache.flink.runtime.taskmanager.NoOpTaskManagerActions; import org.apache.flink.runtime.taskmanager.Task; @@ -91,7 +92,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { private final TestingHighAvailabilityServices haServices; private final TemporaryFolder temporaryFolder; - private final TaskSlotTable taskSlotTable; + private final ThreadSafeTaskSlotTable threadSafeTaskSlotTable; private final JobMasterId jobMasterId; private TestingTaskExecutor taskExecutor; @@ -116,7 +117,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { this.jobMasterId = jobMasterId; - this.taskSlotTable = slotSize > 0 ? + final TaskSlotTable taskSlotTable = slotSize > 0 ? TaskSlotUtils.createTaskSlotTable(slotSize) : TestingTaskSlotTable .newBuilder() @@ -167,6 +168,8 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { taskExecutor.start(); taskExecutor.waitUntilStarted(); + + this.threadSafeTaskSlotTable = new ThreadSafeTaskSlotTable<>(taskSlotTable, taskExecutor.getMainThreadExecutableForTesting()); } static void registerJobMasterConnection( @@ -198,7 +201,7 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { } public TaskSlotTable getTaskSlotTable() { - return taskSlotTable; + return threadSafeTaskSlotTable; } public JobMasterId getJobMasterId() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java index 7a4a01f..95b2f2d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutor.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.io.network.partition.TaskExecutorPartitionTracker; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.MainThreadExecutable; import org.apache.flink.runtime.rpc.RpcService; import javax.annotation.Nullable; @@ -80,4 +81,8 @@ class TestingTaskExecutor extends TaskExecutor { void waitUntilStarted() { startFuture.join(); } + + MainThreadExecutable getMainThreadExecutableForTesting() { + return this.rpcServer; + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java new file mode 100644 index 000..0b9b668 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses
[flink] branch master updated (dcd764a -> e2cffe1)
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a change to branch master in repository https://gitbox.apache.org/repos/asf/flink.git. from dcd764a [FLINK-18143][python] Fix Python meter metric incorrect value problem (#12498) new 84c2905 [FLINK-15687][runtime][test] Fix accessing TaskSlotTable via TaskSubmissionTestEnvironment not in RPC main thread. new e2cffe1 [FLINK-15687][runtime][test] Make TaskManagerActions access task slot table on rpc main thread in TaskSubmissionTestEnvironment. The 2 revisions listed above as "new" are entirely new to this repository and will be described in separate emails. The revisions listed as "add" were already present in the repository and have only been added to this reference. Summary of changes: .../TaskSubmissionTestEnvironment.java | 34 ++-- .../runtime/taskexecutor/TestingTaskExecutor.java | 5 + .../taskexecutor/slot/ThreadSafeTaskSlotTable.java | 206 + 3 files changed, 230 insertions(+), 15 deletions(-) create mode 100644 flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/slot/ThreadSafeTaskSlotTable.java
[flink] 02/02: [FLINK-15687][runtime][test] Make TaskManagerActions access task slot table on rpc main thread in TaskSubmissionTestEnvironment.
This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git commit e2cffe19a044d6decfae83e14ce59eca1e4c449f Author: Xintong Song AuthorDate: Thu May 28 16:23:31 2020 +0800 [FLINK-15687][runtime][test] Make TaskManagerActions access task slot table on rpc main thread in TaskSubmissionTestEnvironment. This closes #12399. --- .../TaskSubmissionTestEnvironment.java | 25 +++--- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java index 14adabf..e79b720 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java @@ -137,20 +137,8 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { jobMasterGateway = testingJobMasterGateway; } - TaskManagerActions taskManagerActions; - if (taskManagerActionListeners.size() == 0) { - taskManagerActions = new NoOpTaskManagerActions(); - } else { - TestTaskManagerActions testTaskManagerActions = new TestTaskManagerActions(taskSlotTable, jobMasterGateway); - for (Tuple3> listenerTuple : taskManagerActionListeners) { - testTaskManagerActions.addListener(listenerTuple.f0, listenerTuple.f1, listenerTuple.f2); - } - taskManagerActions = testTaskManagerActions; - } - this.testingRpcService = testingRpcService; final DefaultJobTable jobTable = DefaultJobTable.create(); - registerJobMasterConnection(jobTable, jobId, testingRpcService, jobMasterGateway, taskManagerActions, timeout); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( false, @@ -170,6 +158,19 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { taskExecutor.waitUntilStarted(); this.threadSafeTaskSlotTable = new ThreadSafeTaskSlotTable<>(taskSlotTable, taskExecutor.getMainThreadExecutableForTesting()); + + TaskManagerActions taskManagerActions; + if (taskManagerActionListeners.size() == 0) { + taskManagerActions = new NoOpTaskManagerActions(); + } else { + TestTaskManagerActions testTaskManagerActions = new TestTaskManagerActions(threadSafeTaskSlotTable, jobMasterGateway); + for (Tuple3> listenerTuple : taskManagerActionListeners) { + testTaskManagerActions.addListener(listenerTuple.f0, listenerTuple.f1, listenerTuple.f2); + } + taskManagerActions = testTaskManagerActions; + } + + registerJobMasterConnection(jobTable, jobId, testingRpcService, jobMasterGateway, taskManagerActions, timeout); } static void registerJobMasterConnection(