buildbot failure in on flink-docs-release-0.9
The Buildbot has detected a new failure on builder flink-docs-release-0.9 while building . Full details are available at: https://ci.apache.org/builders/flink-docs-release-0.9/builds/695 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: bb_slave2_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.9' triggered this build Build Source Stamp: [branch release-0.9] HEAD Blamelist: BUILD FAILED: failed Flink docs Sincerely, -The Buildbot
buildbot failure in on flink-docs-release-1.0
The Buildbot has detected a new failure on builder flink-docs-release-1.0 while building . Full details are available at: https://ci.apache.org/builders/flink-docs-release-1.0/builds/454 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: bb_slave3_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-1.0' triggered this build Build Source Stamp: [branch release-1.0] HEAD Blamelist: BUILD FAILED: failed Flink docs Sincerely, -The Buildbot
buildbot failure in on flink-docs-release-1.1
The Buildbot has detected a new failure on builder flink-docs-release-1.1 while building . Full details are available at: https://ci.apache.org/builders/flink-docs-release-1.1/builds/300 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: bb_slave3_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-1.1' triggered this build Build Source Stamp: [branch release-1.1] HEAD Blamelist: BUILD FAILED: failed Flink docs Sincerely, -The Buildbot
buildbot failure in on flink-docs-release-1.2
The Buildbot has detected a new failure on builder flink-docs-release-1.2 while building . Full details are available at: https://ci.apache.org/builders/flink-docs-release-1.2/builds/124 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: bb_slave2_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-1.2' triggered this build Build Source Stamp: [branch release-1.2] HEAD Blamelist: BUILD FAILED: failed Flink docs Sincerely, -The Buildbot
buildbot failure in on flink-docs-master
The Buildbot has detected a new failure on builder flink-docs-master while building . Full details are available at: https://ci.apache.org/builders/flink-docs-master/builds/717 Buildbot URL: https://ci.apache.org/ Buildslave for this Build: bb_slave3_ubuntu Build Reason: The Nightly scheduler named 'flink-nightly-docs-master' triggered this build Build Source Stamp: [branch master] HEAD Blamelist: BUILD FAILED: failed Flink docs Sincerely, -The Buildbot
[5/5] flink git commit: [FLINK-5636][metrics] Measure numRecordsIn in StreamTwoInputProcessor
[FLINK-5636][metrics] Measure numRecordsIn in StreamTwoInputProcessor Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54b88d71 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54b88d71 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54b88d71 Branch: refs/heads/release-1.3 Commit: 54b88d71c78a670762e152b337c04a2e68e8481d Parents: 17a4bb1 Author: zentol Authored: Fri May 19 14:39:20 2017 +0200 Committer: zentol Committed: Fri May 19 21:09:08 2017 +0200 -- .../flink/streaming/runtime/io/StreamTwoInputProcessor.java | 9 + 1 file changed, 9 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/54b88d71/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index d34686d..367b773 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -38,6 +39,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; @@ -114,6 +116,8 @@ public class StreamTwoInputProcessor { private long lastEmittedWatermark1; private long lastEmittedWatermark2; + private Counter numRecordsIn; + private boolean isFinished; @SuppressWarnings("unchecked") @@ -195,6 +199,9 @@ public class StreamTwoInputProcessor { if (isFinished) { return false; } + if (numRecordsIn == null) { + numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); + } while (true) { if (currentRecordDeserializer != null) { @@ -230,6 +237,7 @@ public class StreamTwoInputProcessor { else { StreamRecord record = recordOrWatermark.asRecord(); synchronized (lock) { + numRecordsIn.inc(); streamOperator.setKeyContextElement1(record); streamOperator.processElement1(record); } @@ -256,6 +264,7 @@ public class StreamTwoInputProcessor { else { StreamRecord record = recordOrWatermark.asRecord(); synchronized (lock) { + numRecordsIn.inc(); streamOperator.setKeyContextElement2(record); streamOperator.processElement2(record); }
[3/5] flink git commit: [hotfix][rat] Add exclusion for all test snapshots/savepoints
[hotfix][rat] Add exclusion for all test snapshots/savepoints This closes #3854. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17a4bb14 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17a4bb14 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17a4bb14 Branch: refs/heads/release-1.3 Commit: 17a4bb14821fd107131e1051b57326e43d660195 Parents: c62553c Author: zentol Authored: Wed May 10 16:50:18 2017 +0200 Committer: zentol Committed: Fri May 19 21:09:08 2017 +0200 -- ...inkKafkaConsumerBaseFrom11MigrationTest.java | 6 +++--- ...inkKafkaConsumerBaseFrom12MigrationTest.java | 8 ...migration-test-flink1.1-empty-state-snapshot | Bin 0 -> 468 bytes ...migration-test-flink1.1-snapshot-empty-state | Bin 468 -> 0 bytes ...migration-test-flink1.2-empty-state-snapshot | Bin 0 -> 240 bytes ...migration-test-flink1.2-snapshot-empty-state | Bin 240 -> 0 bytes .../FlinkKinesisConsumerMigrationTest.java | 2 +- ...sumer-migration-test-flink1.1-empty-snapshot | Bin 0 -> 468 bytes ...sumer-migration-test-flink1.1-snapshot-empty | Bin 468 -> 0 bytes .../cep/operator/CEPMigration11to13Test.java| 10 +- .../test/resources/cep-branching-1_2-snapshot | Bin 0 -> 6736 bytes .../test/resources/cep-branching-snapshot-1.2 | Bin 6736 -> 0 bytes .../src/test/resources/cep-keyed-1_1-snapshot | Bin 0 -> 5612 bytes .../src/test/resources/cep-keyed-snapshot-1.1 | Bin 5612 -> 0 bytes .../test/resources/cep-non-keyed-1.1-snapshot | Bin 0 -> 3274 bytes .../test/resources/cep-non-keyed-snapshot-1.1 | Bin 3274 -> 0 bytes .../resources/cep-single-pattern-1.2-snapshot | Bin 0 -> 3311 bytes .../resources/cep-single-pattern-snapshot-1.2 | Bin 3311 -> 0 bytes .../test/resources/cep-starting-1.2-snapshot| Bin 0 -> 6526 bytes .../test/resources/cep-starting-snapshot-1.2| Bin 6526 -> 0 bytes ...atefulJobSavepointFrom11MigrationITCase.java | 4 ++-- ...atefulJobSavepointFrom12MigrationITCase.java | 4 ++-- ...-migration-itcase-flink1.1-rocksdb-savepoint | Bin 0 -> 22283 bytes ...-migration-itcase-flink1.1-savepoint-rocksdb | Bin 22283 -> 0 bytes ...-migration-itcase-flink1.2-rocksdb-savepoint | Bin 0 -> 25256 bytes ...-migration-itcase-flink1.2-savepoint-rocksdb | Bin 25256 -> 0 bytes pom.xml | 19 ++- 27 files changed, 19 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/17a4bb14/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java -- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java index 7cc1f9c..c07ebd5 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java @@ -67,7 +67,7 @@ public class FlinkKafkaConsumerBaseFrom11MigrationTest { testHarness.setup(); // restore state from binary snapshot file using legacy method testHarness.initializeStateFromLegacyCheckpoint( - getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state")); + getResourceFilename("kafka-consumer-migration-test-flink1.1-empty-state-snapshot")); testHarness.open(); // assert that no partitions were found and is empty @@ -101,10 +101,10 @@ public class FlinkKafkaConsumerBaseFrom11MigrationTest { testHarness.setup(); // restore state from binary snapshot file using legacy method testHarness.initializeStateFromLegacyCheckpoint( - getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state")); + getResourceFilename("kafka-consumer-migration-test-flink1.1-empty-state-snapshot")); testHarness.open(); - // the expected state in "kafka-consumer-migration-test-flink1.1-snapshot-empty-state"; + // the expected state in "kafka-consumer-migration-test-flink1.1-empty-state-snapshot"; // since the state is empty, the consumer should reflect on the startup mode to determine start offsets.
[1/5] flink git commit: [FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil
Repository: flink Updated Branches: refs/heads/release-1.3 c62553c00 -> 5fde739fd [FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5fde739f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5fde739f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5fde739f Branch: refs/heads/release-1.3 Commit: 5fde739fd2b040a90d42a6a73f1d119648e863a7 Parents: c3ab5c8 Author: zjureel Authored: Mon May 15 18:14:11 2017 +0800 Committer: zentol Committed: Fri May 19 21:09:08 2017 +0200 -- .../streaming/util/OperatorSnapshotUtil.java| 162 ++- 1 file changed, 82 insertions(+), 80 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/5fde739f/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java -- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java index 92a9452..8011279 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java @@ -46,111 +46,113 @@ public class OperatorSnapshotUtil { public static void writeStateHandle(OperatorStateHandles state, String path) throws IOException { FileOutputStream out = new FileOutputStream(path); - DataOutputStream dos = new DataOutputStream(out); - - dos.writeInt(state.getOperatorChainIndex()); - - SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos); - - Collection rawOperatorState = state.getRawOperatorState(); - if (rawOperatorState != null) { - dos.writeInt(rawOperatorState.size()); - for (OperatorStateHandle operatorStateHandle : rawOperatorState) { - SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); + + try (DataOutputStream dos = new DataOutputStream(out)) { + + dos.writeInt(state.getOperatorChainIndex()); + + SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos); + + Collection rawOperatorState = state.getRawOperatorState(); + if (rawOperatorState != null) { + dos.writeInt(rawOperatorState.size()); + for (OperatorStateHandle operatorStateHandle : rawOperatorState) { + SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); + } + } else { + // this means no states, not even an empty list + dos.writeInt(-1); } - } else { - // this means no states, not even an empty list - dos.writeInt(-1); - } - Collection managedOperatorState = state.getManagedOperatorState(); - if (managedOperatorState != null) { - dos.writeInt(managedOperatorState.size()); - for (OperatorStateHandle operatorStateHandle : managedOperatorState) { - SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); + Collection managedOperatorState = state.getManagedOperatorState(); + if (managedOperatorState != null) { + dos.writeInt(managedOperatorState.size()); + for (OperatorStateHandle operatorStateHandle : managedOperatorState) { + SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); + } + } else { + // this means no states, not even an empty list + dos.writeInt(-1); } - } else { - // this means no states, not even an empty list - dos.writeInt(-1); - } - Collection rawKeyedState = state.getRawKeyedState(); - if (rawKeyedState != null) { - dos.writeInt(rawKeyedState.size()); - for (KeyedStateHandle keyedStateHandle :
[4/5] flink git commit: [FLINK-6586] InputGateMetrics return 0 as min for local channels
[FLINK-6586] InputGateMetrics return 0 as min for local channels Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c3ab5c82 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c3ab5c82 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c3ab5c82 Branch: refs/heads/release-1.3 Commit: c3ab5c8253b32bddc3fb9bf0c1085813e7f97e2f Parents: 5d1cda5 Author: zentol Authored: Mon May 15 13:56:06 2017 +0200 Committer: zentol Committed: Fri May 19 21:09:08 2017 +0200 -- .../io/network/partition/consumer/InputGateMetrics.java | 7 +++ 1 file changed, 3 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/c3ab5c82/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java index 796a6db..69af455 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java @@ -72,10 +72,6 @@ public class InputGateMetrics { int min = Integer.MAX_VALUE; Collection channels = inputGate.getInputChannels().values(); - if (channels.isEmpty()) { - // meaningful value when no channels exist: - return 0; - } for (InputChannel channel : channels) { if (channel instanceof RemoteInputChannel) { @@ -86,6 +82,9 @@ public class InputGateMetrics { } } + if (min == Integer.MAX_VALUE) { // in case all channels are local, or the channel collection was empty + return 0; + } return min; }
[2/5] flink git commit: [FLINK-6639][docs] fix code tabs in CEP docs
[FLINK-6639][docs] fix code tabs in CEP docs Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d1cda52 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d1cda52 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d1cda52 Branch: refs/heads/release-1.3 Commit: 5d1cda52e58b53a0ae9b3e9a691102617a475aff Parents: 54b88d7 Author: David Anderson Authored: Fri May 19 15:47:17 2017 +0200 Committer: zentol Committed: Fri May 19 21:09:08 2017 +0200 -- docs/dev/libs/cep.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/5d1cda52/docs/dev/libs/cep.md -- diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md index 58e1a0a..a5ca8b1 100644 --- a/docs/dev/libs/cep.md +++ b/docs/dev/libs/cep.md @@ -282,6 +282,7 @@ Pattern nonStrictNext = start.followedBy("middle"); val nonStrictNext : Pattern[Event, _] = start.followedBy("middle") {% endhighlight %} + For non-strict contiguity one can specify if only the first succeeding matching event will be matched, or all. In the latter case multiple matches will be emitted for the same beginning.
[3/5] flink git commit: [FLINK-6586] InputGateMetrics return 0 as min for local channels
[FLINK-6586] InputGateMetrics return 0 as min for local channels This closes #3907. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17ec6f02 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17ec6f02 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17ec6f02 Branch: refs/heads/master Commit: 17ec6f020b779efe9152456f4ef33f6f802e4f67 Parents: fadc026 Author: zentol Authored: Mon May 15 13:56:06 2017 +0200 Committer: zentol Committed: Fri May 19 21:08:34 2017 +0200 -- .../io/network/partition/consumer/InputGateMetrics.java | 7 +++ 1 file changed, 3 insertions(+), 4 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/17ec6f02/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java index 796a6db..69af455 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java @@ -72,10 +72,6 @@ public class InputGateMetrics { int min = Integer.MAX_VALUE; Collection channels = inputGate.getInputChannels().values(); - if (channels.isEmpty()) { - // meaningful value when no channels exist: - return 0; - } for (InputChannel channel : channels) { if (channel instanceof RemoteInputChannel) { @@ -86,6 +82,9 @@ public class InputGateMetrics { } } + if (min == Integer.MAX_VALUE) { // in case all channels are local, or the channel collection was empty + return 0; + } return min; }
[1/5] flink git commit: [hotfix][rat] Add exclusion for all test snapshots/savepoints
Repository: flink Updated Branches: refs/heads/master 8e3213678 -> 65fdadac8 [hotfix][rat] Add exclusion for all test snapshots/savepoints This closes #3854. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b485307 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b485307 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b485307 Branch: refs/heads/master Commit: 4b485307800d04af460fec29f6f2b34b1ff189b1 Parents: 8e32136 Author: zentol Authored: Wed May 10 16:50:18 2017 +0200 Committer: zentol Committed: Fri May 19 21:08:33 2017 +0200 -- ...inkKafkaConsumerBaseFrom11MigrationTest.java | 6 +++--- ...inkKafkaConsumerBaseFrom12MigrationTest.java | 8 ...migration-test-flink1.1-empty-state-snapshot | Bin 0 -> 468 bytes ...migration-test-flink1.1-snapshot-empty-state | Bin 468 -> 0 bytes ...migration-test-flink1.2-empty-state-snapshot | Bin 0 -> 240 bytes ...migration-test-flink1.2-snapshot-empty-state | Bin 240 -> 0 bytes .../FlinkKinesisConsumerMigrationTest.java | 2 +- ...sumer-migration-test-flink1.1-empty-snapshot | Bin 0 -> 468 bytes ...sumer-migration-test-flink1.1-snapshot-empty | Bin 468 -> 0 bytes .../cep/operator/CEPMigration11to13Test.java| 10 +- .../test/resources/cep-branching-1_2-snapshot | Bin 0 -> 6736 bytes .../test/resources/cep-branching-snapshot-1.2 | Bin 6736 -> 0 bytes .../src/test/resources/cep-keyed-1_1-snapshot | Bin 0 -> 5612 bytes .../src/test/resources/cep-keyed-snapshot-1.1 | Bin 5612 -> 0 bytes .../test/resources/cep-non-keyed-1.1-snapshot | Bin 0 -> 3274 bytes .../test/resources/cep-non-keyed-snapshot-1.1 | Bin 3274 -> 0 bytes .../resources/cep-single-pattern-1.2-snapshot | Bin 0 -> 3311 bytes .../resources/cep-single-pattern-snapshot-1.2 | Bin 3311 -> 0 bytes .../test/resources/cep-starting-1.2-snapshot| Bin 0 -> 6526 bytes .../test/resources/cep-starting-snapshot-1.2| Bin 6526 -> 0 bytes ...atefulJobSavepointFrom11MigrationITCase.java | 4 ++-- ...atefulJobSavepointFrom12MigrationITCase.java | 4 ++-- ...-migration-itcase-flink1.1-rocksdb-savepoint | Bin 0 -> 22283 bytes ...-migration-itcase-flink1.1-savepoint-rocksdb | Bin 22283 -> 0 bytes ...-migration-itcase-flink1.2-rocksdb-savepoint | Bin 0 -> 25256 bytes ...-migration-itcase-flink1.2-savepoint-rocksdb | Bin 25256 -> 0 bytes pom.xml | 19 ++- 27 files changed, 19 insertions(+), 34 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java -- diff --git a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java index 7cc1f9c..c07ebd5 100644 --- a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java +++ b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java @@ -67,7 +67,7 @@ public class FlinkKafkaConsumerBaseFrom11MigrationTest { testHarness.setup(); // restore state from binary snapshot file using legacy method testHarness.initializeStateFromLegacyCheckpoint( - getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state")); + getResourceFilename("kafka-consumer-migration-test-flink1.1-empty-state-snapshot")); testHarness.open(); // assert that no partitions were found and is empty @@ -101,10 +101,10 @@ public class FlinkKafkaConsumerBaseFrom11MigrationTest { testHarness.setup(); // restore state from binary snapshot file using legacy method testHarness.initializeStateFromLegacyCheckpoint( - getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state")); + getResourceFilename("kafka-consumer-migration-test-flink1.1-empty-state-snapshot")); testHarness.open(); - // the expected state in "kafka-consumer-migration-test-flink1.1-snapshot-empty-state"; + // the expected state in "kafka-consumer-migration-test-flink1.1-empty-state-snapshot"; // since the state is empty, the consume
[2/5] flink git commit: [FLINK-6639][docs] fix code tabs in CEP docs
[FLINK-6639][docs] fix code tabs in CEP docs This closes #3952. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fadc026b Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fadc026b Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fadc026b Branch: refs/heads/master Commit: fadc026bf1e90cd001bd442e5bca595eb69907cf Parents: 8ccaffe Author: David Anderson Authored: Fri May 19 15:47:17 2017 +0200 Committer: zentol Committed: Fri May 19 21:08:34 2017 +0200 -- docs/dev/libs/cep.md | 1 + 1 file changed, 1 insertion(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/fadc026b/docs/dev/libs/cep.md -- diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md index 58e1a0a..a5ca8b1 100644 --- a/docs/dev/libs/cep.md +++ b/docs/dev/libs/cep.md @@ -282,6 +282,7 @@ Pattern nonStrictNext = start.followedBy("middle"); val nonStrictNext : Pattern[Event, _] = start.followedBy("middle") {% endhighlight %} + For non-strict contiguity one can specify if only the first succeeding matching event will be matched, or all. In the latter case multiple matches will be emitted for the same beginning.
[5/5] flink git commit: [FLINK-5636][metrics] Measure numRecordsIn in StreamTwoInputProcessor
[FLINK-5636][metrics] Measure numRecordsIn in StreamTwoInputProcessor This closes #3950. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ccaffe3 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ccaffe3 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ccaffe3 Branch: refs/heads/master Commit: 8ccaffe3d3f2472fc12fa138c45c0b67458ad2a2 Parents: 4b48530 Author: zentol Authored: Fri May 19 14:39:20 2017 +0200 Committer: zentol Committed: Fri May 19 21:08:34 2017 +0200 -- .../flink/streaming/runtime/io/StreamTwoInputProcessor.java | 9 + 1 file changed, 9 insertions(+) -- http://git-wip-us.apache.org/repos/asf/flink/blob/8ccaffe3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java -- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index d34686d..367b773 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.metrics.Counter; import org.apache.flink.metrics.Gauge; import org.apache.flink.runtime.event.AbstractEvent; import org.apache.flink.runtime.io.disk.iomanager.IOManager; @@ -38,6 +39,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent; import org.apache.flink.runtime.io.network.partition.consumer.InputGate; import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup; import org.apache.flink.runtime.plugable.DeserializationDelegate; import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate; @@ -114,6 +116,8 @@ public class StreamTwoInputProcessor { private long lastEmittedWatermark1; private long lastEmittedWatermark2; + private Counter numRecordsIn; + private boolean isFinished; @SuppressWarnings("unchecked") @@ -195,6 +199,9 @@ public class StreamTwoInputProcessor { if (isFinished) { return false; } + if (numRecordsIn == null) { + numRecordsIn = ((OperatorMetricGroup) streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter(); + } while (true) { if (currentRecordDeserializer != null) { @@ -230,6 +237,7 @@ public class StreamTwoInputProcessor { else { StreamRecord record = recordOrWatermark.asRecord(); synchronized (lock) { + numRecordsIn.inc(); streamOperator.setKeyContextElement1(record); streamOperator.processElement1(record); } @@ -256,6 +264,7 @@ public class StreamTwoInputProcessor { else { StreamRecord record = recordOrWatermark.asRecord(); synchronized (lock) { + numRecordsIn.inc(); streamOperator.setKeyContextElement2(record); streamOperator.processElement2(record); }
[4/5] flink git commit: [FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil
[FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil This closes #3904. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/65fdadac Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/65fdadac Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/65fdadac Branch: refs/heads/master Commit: 65fdadac805cb1efe30ff9a57605676b1b8e45b9 Parents: 17ec6f0 Author: zjureel Authored: Mon May 15 18:14:11 2017 +0800 Committer: zentol Committed: Fri May 19 21:08:34 2017 +0200 -- .../streaming/util/OperatorSnapshotUtil.java| 162 ++- 1 file changed, 82 insertions(+), 80 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/65fdadac/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java -- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java index 92a9452..8011279 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java @@ -46,111 +46,113 @@ public class OperatorSnapshotUtil { public static void writeStateHandle(OperatorStateHandles state, String path) throws IOException { FileOutputStream out = new FileOutputStream(path); - DataOutputStream dos = new DataOutputStream(out); - - dos.writeInt(state.getOperatorChainIndex()); - - SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos); - - Collection rawOperatorState = state.getRawOperatorState(); - if (rawOperatorState != null) { - dos.writeInt(rawOperatorState.size()); - for (OperatorStateHandle operatorStateHandle : rawOperatorState) { - SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); + + try (DataOutputStream dos = new DataOutputStream(out)) { + + dos.writeInt(state.getOperatorChainIndex()); + + SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(), dos); + + Collection rawOperatorState = state.getRawOperatorState(); + if (rawOperatorState != null) { + dos.writeInt(rawOperatorState.size()); + for (OperatorStateHandle operatorStateHandle : rawOperatorState) { + SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); + } + } else { + // this means no states, not even an empty list + dos.writeInt(-1); } - } else { - // this means no states, not even an empty list - dos.writeInt(-1); - } - Collection managedOperatorState = state.getManagedOperatorState(); - if (managedOperatorState != null) { - dos.writeInt(managedOperatorState.size()); - for (OperatorStateHandle operatorStateHandle : managedOperatorState) { - SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); + Collection managedOperatorState = state.getManagedOperatorState(); + if (managedOperatorState != null) { + dos.writeInt(managedOperatorState.size()); + for (OperatorStateHandle operatorStateHandle : managedOperatorState) { + SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos); + } + } else { + // this means no states, not even an empty list + dos.writeInt(-1); } - } else { - // this means no states, not even an empty list - dos.writeInt(-1); - } - Collection rawKeyedState = state.getRawKeyedState(); - if (rawKeyedState != null) { - dos.writeInt(rawKeyedState.size()); - for (KeyedStateHandle keyedStateHandle : rawKeyedState) { - SavepointV1Serializ
flink git commit: [FLINK-6634] [cep] NFASerializer serializes ComputationState counter.
Repository: flink Updated Branches: refs/heads/release-1.3 f6a596fe3 -> c62553c00 [FLINK-6634] [cep] NFASerializer serializes ComputationState counter. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c62553c0 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c62553c0 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c62553c0 Branch: refs/heads/release-1.3 Commit: c62553c00d68c1d17078522652fd5ffbaf6bbb90 Parents: f6a596f Author: kkloudas Authored: Fri May 19 10:40:44 2017 +0200 Committer: kkloudas Committed: Fri May 19 16:28:56 2017 +0200 -- .../main/java/org/apache/flink/cep/nfa/NFA.java | 7 ++- .../flink/cep/operator/CEPOperatorTest.java | 50 +++- 2 files changed, 43 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/c62553c0/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index a977a7f..ff5a342 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -930,6 +930,7 @@ public class NFA implements Serializable { timestampSerializer.serialize(computationState.getTimestamp(), target); versionSerializer.serialize(computationState.getVersion(), target); timestampSerializer.serialize(computationState.getStartTimestamp(), target); + target.writeInt(computationState.getCounter()); if (computationState.getEvent() == null) { target.writeBoolean(false); @@ -963,6 +964,7 @@ public class NFA implements Serializable { long timestamp = timestampSerializer.deserialize(source); DeweyNumber version = versionSerializer.deserialize(source); long startTimestamp = timestampSerializer.deserialize(source); + int counter = source.readInt(); T event = null; if (source.readBoolean()) { @@ -970,7 +972,7 @@ public class NFA implements Serializable { } computationStates.add(ComputationState.createState( - nfa, state, prevState, event, 0, timestamp, version, startTimestamp)); + nfa, state, prevState, event, counter, timestamp, version, startTimestamp)); } nfa.computationStates = computationStates; @@ -1028,6 +1030,9 @@ public class NFA implements Serializable { long startTimestamp = timestampSerializer.deserialize(source); timestampSerializer.serialize(startTimestamp, target); + int counter = source.readInt(); + target.writeInt(counter); + boolean hasEvent = source.readBoolean(); target.writeBoolean(hasEvent); if (hasEvent) { http://git-wip-us.apache.org/repos/asf/flink/blob/c62553c0/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java -- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index 41593b0..95e3a37 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -43,6 +43,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.types.Either; import org.apache.flink.util.TestLogger; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -139,6 +140,8 @@ public class CEPOperatorTest extends TestLogger { } @Test + @Ignore + // TODO: 5/19/17 Re-instate when checkpoints are fixed public void testKeyedCEPOperatorCheckpointingWithRocksDB() throws Exception { String rocksDbPath = temp
flink git commit: [FLINK-6634] [cep] NFASerializer serializes ComputationState counter.
Repository: flink Updated Branches: refs/heads/master 8814ba767 -> 8e3213678 [FLINK-6634] [cep] NFASerializer serializes ComputationState counter. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e321367 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e321367 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e321367 Branch: refs/heads/master Commit: 8e32136783a1a3db17d06c1fd9d012dcc4e458aa Parents: 8814ba7 Author: kkloudas Authored: Fri May 19 10:40:44 2017 +0200 Committer: kkloudas Committed: Fri May 19 16:23:33 2017 +0200 -- .../main/java/org/apache/flink/cep/nfa/NFA.java | 7 ++- .../flink/cep/operator/CEPOperatorTest.java | 50 +++- 2 files changed, 43 insertions(+), 14 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/8e321367/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java -- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java index a977a7f..ff5a342 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java @@ -930,6 +930,7 @@ public class NFA implements Serializable { timestampSerializer.serialize(computationState.getTimestamp(), target); versionSerializer.serialize(computationState.getVersion(), target); timestampSerializer.serialize(computationState.getStartTimestamp(), target); + target.writeInt(computationState.getCounter()); if (computationState.getEvent() == null) { target.writeBoolean(false); @@ -963,6 +964,7 @@ public class NFA implements Serializable { long timestamp = timestampSerializer.deserialize(source); DeweyNumber version = versionSerializer.deserialize(source); long startTimestamp = timestampSerializer.deserialize(source); + int counter = source.readInt(); T event = null; if (source.readBoolean()) { @@ -970,7 +972,7 @@ public class NFA implements Serializable { } computationStates.add(ComputationState.createState( - nfa, state, prevState, event, 0, timestamp, version, startTimestamp)); + nfa, state, prevState, event, counter, timestamp, version, startTimestamp)); } nfa.computationStates = computationStates; @@ -1028,6 +1030,9 @@ public class NFA implements Serializable { long startTimestamp = timestampSerializer.deserialize(source); timestampSerializer.serialize(startTimestamp, target); + int counter = source.readInt(); + target.writeInt(counter); + boolean hasEvent = source.readBoolean(); target.writeBoolean(hasEvent); if (hasEvent) { http://git-wip-us.apache.org/repos/asf/flink/blob/8e321367/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java -- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index 41593b0..95e3a37 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -43,6 +43,7 @@ import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.types.Either; import org.apache.flink.util.TestLogger; import org.junit.Assert; +import org.junit.Ignore; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; @@ -139,6 +140,8 @@ public class CEPOperatorTest extends TestLogger { } @Test + @Ignore + // TODO: 5/19/17 Re-instate when checkpoints are fixed public void testKeyedCEPOperatorCheckpointingWithRocksDB() throws Exception { String rocksDbPath = tempFolder.new
[2/2] flink git commit: [FLINK-6606] Set UserCodeClassLoader as TCCL for MasterTriggerRestoreHook
[FLINK-6606] Set UserCodeClassLoader as TCCL for MasterTriggerRestoreHook - wrap calls to MasterTriggerRestoreHook (and its factory) such that the user classloader is applied This closes #3933. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ad08163 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ad08163 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ad08163 Branch: refs/heads/master Commit: 2ad081636e54a2b8fd98a935a95c0949818843ad Parents: acea4cd Author: Wright, Eron Authored: Wed May 17 09:46:13 2017 -0700 Committer: Till Rohrmann Committed: Fri May 19 15:22:49 2017 +0200 -- .../runtime/checkpoint/hooks/MasterHooks.java | 108 +++ .../executiongraph/ExecutionGraphBuilder.java | 15 ++- .../checkpoint/hooks/MasterHooksTest.java | 131 +++ 3 files changed, 251 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/2ad08163/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java index 409019e..737e816 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint.hooks; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.io.SimpleVersionedSerializer; @@ -29,6 +30,7 @@ import org.apache.flink.util.FlinkException; import org.slf4j.Logger; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashMap; @@ -267,6 +269,112 @@ public class MasterHooks { } // + // hook management + // + + /** +* Wraps a hook such that the user-code classloader is applied when the hook is invoked. +* @param hook the hook to wrap +* @param userClassLoader the classloader to use +*/ + public static MasterTriggerRestoreHook wrapHook(MasterTriggerRestoreHook hook, ClassLoader userClassLoader) { + return new WrappedMasterHook(hook, userClassLoader); + } + + @VisibleForTesting + static class WrappedMasterHook implements MasterTriggerRestoreHook { + + private final MasterTriggerRestoreHook hook; + private final ClassLoader userClassLoader; + + WrappedMasterHook(MasterTriggerRestoreHook hook, ClassLoader userClassLoader) { + this.hook = hook; + this.userClassLoader = userClassLoader; + } + + @Override + public String getIdentifier() { + Thread thread = Thread.currentThread(); + ClassLoader originalClassLoader = thread.getContextClassLoader(); + thread.setContextClassLoader(userClassLoader); + try { + return hook.getIdentifier(); + } + finally { + thread.setContextClassLoader(originalClassLoader); + } + } + + @Nullable + @Override + public Future triggerCheckpoint(long checkpointId, long timestamp, final Executor executor) throws Exception { + Executor wrappedExecutor = new Executor() { + @Override + public void execute(Runnable command) { + executor.execute(new WrappedCommand(command)); + } + }; + + Thread thread = Thread.currentThread(); + ClassLoader originalClassLoader = thread.getContextClassLoader(); + thread.setContextClassLoader(userClassLoader); + try { + return hook.triggerCheckpoint(checkpointId, timestamp, wrappedExecutor); + } + finally { + thread.setContextClassLoader(originalClassLoader); + } + } + +
[1/2] flink git commit: [FLINK-6606] Hide WrapperMasterHook by making it private
Repository: flink Updated Branches: refs/heads/master acea4cde5 -> 8814ba767 [FLINK-6606] Hide WrapperMasterHook by making it private Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8814ba76 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8814ba76 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8814ba76 Branch: refs/heads/master Commit: 8814ba767fc9af610d1c799338fa4168afc4f3fb Parents: 2ad0816 Author: Till Rohrmann Authored: Fri May 19 15:06:35 2017 +0200 Committer: Till Rohrmann Committed: Fri May 19 15:22:49 2017 +0200 -- .../runtime/checkpoint/hooks/MasterHooks.java | 41 +++- .../executiongraph/ExecutionGraphBuilder.java | 5 ++- .../checkpoint/hooks/MasterHooksTest.java | 11 ++ 3 files changed, 29 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/8814ba76/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java index 737e816..1851eb6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.checkpoint.hooks; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.io.SimpleVersionedSerializer; @@ -28,6 +27,7 @@ import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import javax.annotation.Nullable; @@ -278,25 +278,25 @@ public class MasterHooks { * @param userClassLoader the classloader to use */ public static MasterTriggerRestoreHook wrapHook(MasterTriggerRestoreHook hook, ClassLoader userClassLoader) { - return new WrappedMasterHook(hook, userClassLoader); + return new WrappedMasterHook<>(hook, userClassLoader); } - @VisibleForTesting - static class WrappedMasterHook implements MasterTriggerRestoreHook { + private static class WrappedMasterHook implements MasterTriggerRestoreHook { private final MasterTriggerRestoreHook hook; private final ClassLoader userClassLoader; WrappedMasterHook(MasterTriggerRestoreHook hook, ClassLoader userClassLoader) { - this.hook = hook; - this.userClassLoader = userClassLoader; + this.hook = Preconditions.checkNotNull(hook); + this.userClassLoader = Preconditions.checkNotNull(userClassLoader); } @Override public String getIdentifier() { - Thread thread = Thread.currentThread(); - ClassLoader originalClassLoader = thread.getContextClassLoader(); + final Thread thread = Thread.currentThread(); + final ClassLoader originalClassLoader = thread.getContextClassLoader(); thread.setContextClassLoader(userClassLoader); + try { return hook.getIdentifier(); } @@ -315,9 +315,10 @@ public class MasterHooks { } }; - Thread thread = Thread.currentThread(); - ClassLoader originalClassLoader = thread.getContextClassLoader(); + final Thread thread = Thread.currentThread(); + final ClassLoader originalClassLoader = thread.getContextClassLoader(); thread.setContextClassLoader(userClassLoader); + try { return hook.triggerCheckpoint(checkpointId, timestamp, wrappedExecutor); } @@ -328,9 +329,10 @@ public class MasterHooks { @Override public void restoreCheckpoint(long checkpointId, @Nullable T checkpointData) throws Exception { - Thread thread = Thread.currentThread(); - ClassLoader originalClassLoader = thread.getContextClassLoader(); + final Thread thread = Thread.currentThread(); + final ClassLoader or
[2/2] flink git commit: [FLINK-6606] Hide WrapperMasterHook by making it private
[FLINK-6606] Hide WrapperMasterHook by making it private Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6a596fe Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6a596fe Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6a596fe Branch: refs/heads/release-1.3 Commit: f6a596fe3f8924602c34636959e3d7e48cb2262f Parents: 3dba48e Author: Till Rohrmann Authored: Fri May 19 15:06:35 2017 +0200 Committer: Till Rohrmann Committed: Fri May 19 15:21:24 2017 +0200 -- .../runtime/checkpoint/hooks/MasterHooks.java | 41 +++- .../executiongraph/ExecutionGraphBuilder.java | 5 ++- .../checkpoint/hooks/MasterHooksTest.java | 11 ++ 3 files changed, 29 insertions(+), 28 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/f6a596fe/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java index 737e816..1851eb6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.checkpoint.hooks; -import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.io.SimpleVersionedSerializer; @@ -28,6 +27,7 @@ import org.apache.flink.runtime.concurrent.Future; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; +import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import javax.annotation.Nullable; @@ -278,25 +278,25 @@ public class MasterHooks { * @param userClassLoader the classloader to use */ public static MasterTriggerRestoreHook wrapHook(MasterTriggerRestoreHook hook, ClassLoader userClassLoader) { - return new WrappedMasterHook(hook, userClassLoader); + return new WrappedMasterHook<>(hook, userClassLoader); } - @VisibleForTesting - static class WrappedMasterHook implements MasterTriggerRestoreHook { + private static class WrappedMasterHook implements MasterTriggerRestoreHook { private final MasterTriggerRestoreHook hook; private final ClassLoader userClassLoader; WrappedMasterHook(MasterTriggerRestoreHook hook, ClassLoader userClassLoader) { - this.hook = hook; - this.userClassLoader = userClassLoader; + this.hook = Preconditions.checkNotNull(hook); + this.userClassLoader = Preconditions.checkNotNull(userClassLoader); } @Override public String getIdentifier() { - Thread thread = Thread.currentThread(); - ClassLoader originalClassLoader = thread.getContextClassLoader(); + final Thread thread = Thread.currentThread(); + final ClassLoader originalClassLoader = thread.getContextClassLoader(); thread.setContextClassLoader(userClassLoader); + try { return hook.getIdentifier(); } @@ -315,9 +315,10 @@ public class MasterHooks { } }; - Thread thread = Thread.currentThread(); - ClassLoader originalClassLoader = thread.getContextClassLoader(); + final Thread thread = Thread.currentThread(); + final ClassLoader originalClassLoader = thread.getContextClassLoader(); thread.setContextClassLoader(userClassLoader); + try { return hook.triggerCheckpoint(checkpointId, timestamp, wrappedExecutor); } @@ -328,9 +329,10 @@ public class MasterHooks { @Override public void restoreCheckpoint(long checkpointId, @Nullable T checkpointData) throws Exception { - Thread thread = Thread.currentThread(); - ClassLoader originalClassLoader = thread.getContextClassLoader(); + final Thread thread = Thread.currentThread(); + final ClassLoader originalClassLoader = thread.getContextClassLoader();
[1/2] flink git commit: [FLINK-6606] Set UserCodeClassLoader as TCCL for MasterTriggerRestoreHook
Repository: flink Updated Branches: refs/heads/release-1.3 6d178a959 -> f6a596fe3 [FLINK-6606] Set UserCodeClassLoader as TCCL for MasterTriggerRestoreHook - wrap calls to MasterTriggerRestoreHook (and its factory) such that the user classloader is applied This closes #3933. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3dba48ee Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3dba48ee Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3dba48ee Branch: refs/heads/release-1.3 Commit: 3dba48ee6ad3dc6bdb9abfaa91accbb8581cfd2a Parents: 6d178a9 Author: Wright, Eron Authored: Wed May 17 09:46:13 2017 -0700 Committer: Till Rohrmann Committed: Fri May 19 15:21:23 2017 +0200 -- .../runtime/checkpoint/hooks/MasterHooks.java | 108 +++ .../executiongraph/ExecutionGraphBuilder.java | 15 ++- .../checkpoint/hooks/MasterHooksTest.java | 131 +++ 3 files changed, 251 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/3dba48ee/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java -- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java index 409019e..737e816 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.checkpoint.hooks; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.io.SimpleVersionedSerializer; @@ -29,6 +30,7 @@ import org.apache.flink.util.FlinkException; import org.slf4j.Logger; +import javax.annotation.Nullable; import java.util.ArrayList; import java.util.Collection; import java.util.LinkedHashMap; @@ -267,6 +269,112 @@ public class MasterHooks { } // + // hook management + // + + /** +* Wraps a hook such that the user-code classloader is applied when the hook is invoked. +* @param hook the hook to wrap +* @param userClassLoader the classloader to use +*/ + public static MasterTriggerRestoreHook wrapHook(MasterTriggerRestoreHook hook, ClassLoader userClassLoader) { + return new WrappedMasterHook(hook, userClassLoader); + } + + @VisibleForTesting + static class WrappedMasterHook implements MasterTriggerRestoreHook { + + private final MasterTriggerRestoreHook hook; + private final ClassLoader userClassLoader; + + WrappedMasterHook(MasterTriggerRestoreHook hook, ClassLoader userClassLoader) { + this.hook = hook; + this.userClassLoader = userClassLoader; + } + + @Override + public String getIdentifier() { + Thread thread = Thread.currentThread(); + ClassLoader originalClassLoader = thread.getContextClassLoader(); + thread.setContextClassLoader(userClassLoader); + try { + return hook.getIdentifier(); + } + finally { + thread.setContextClassLoader(originalClassLoader); + } + } + + @Nullable + @Override + public Future triggerCheckpoint(long checkpointId, long timestamp, final Executor executor) throws Exception { + Executor wrappedExecutor = new Executor() { + @Override + public void execute(Runnable command) { + executor.execute(new WrappedCommand(command)); + } + }; + + Thread thread = Thread.currentThread(); + ClassLoader originalClassLoader = thread.getContextClassLoader(); + thread.setContextClassLoader(userClassLoader); + try { + return hook.triggerCheckpoint(checkpointId, timestamp, wrappedExecutor); + } + finally { + thread.setCo
flink git commit: [FLINK-6574] [table] Support nested catalogs in ExternalCatalog.
Repository: flink Updated Branches: refs/heads/release-1.3 d1cff3ae3 -> 6d178a959 [FLINK-6574] [table] Support nested catalogs in ExternalCatalog. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d178a95 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d178a95 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d178a95 Branch: refs/heads/release-1.3 Commit: 6d178a9597f5f9b79a0e3a6f4a61a10734188c85 Parents: d1cff3a Author: Haohui Mai Authored: Mon May 15 17:09:18 2017 -0700 Committer: Fabian Hueske Committed: Fri May 19 14:23:11 2017 +0200 -- .../org/apache/flink/table/api/exceptions.scala | 48 +++ .../table/catalog/CrudExternalCatalog.scala | 78 +- .../flink/table/catalog/ExternalCatalog.scala | 38 ++--- .../table/catalog/ExternalCatalogDatabase.scala | 31 .../table/catalog/ExternalCatalogSchema.scala | 91 +--- .../table/catalog/ExternalCatalogTable.scala| 16 --- .../table/catalog/InMemoryExternalCatalog.scala | 142 +++ .../flink/table/ExternalCatalogTest.scala | 33 + .../catalog/InMemoryExternalCatalogTest.scala | 103 +++--- .../flink/table/utils/CommonTestData.scala | 17 ++- 10 files changed, 259 insertions(+), 338 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6d178a95/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala index 760cf75..7ea17fa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala @@ -77,63 +77,63 @@ case class UnresolvedException(msg: String) extends RuntimeException(msg) /** * Exception for an operation on a nonexistent table * - * @param dbdatabase name - * @param table table name - * @param cause the cause + * @param catalogcatalog name + * @param table table name + * @param cause the cause */ case class TableNotExistException( -db: String, +catalog: String, table: String, cause: Throwable) -extends RuntimeException(s"Table $db.$table does not exist.", cause) { +extends RuntimeException(s"Table $catalog.$table does not exist.", cause) { - def this(db: String, table: String) = this(db, table, null) + def this(catalog: String, table: String) = this(catalog, table, null) } /** * Exception for adding an already existent table * - * @param dbdatabase name - * @param table table name - * @param cause the cause + * @param catalogcatalog name + * @param table table name + * @param cause the cause */ case class TableAlreadyExistException( -db: String, +catalog: String, table: String, cause: Throwable) -extends RuntimeException(s"Table $db.$table already exists.", cause) { +extends RuntimeException(s"Table $catalog.$table already exists.", cause) { - def this(db: String, table: String) = this(db, table, null) + def this(catalog: String, table: String) = this(catalog, table, null) } /** - * Exception for operation on a nonexistent database + * Exception for operation on a nonexistent catalog * - * @param db database name + * @param catalog catalog name * @param cause the cause */ -case class DatabaseNotExistException( -db: String, +case class CatalogNotExistException( +catalog: String, cause: Throwable) -extends RuntimeException(s"Database $db does not exist.", cause) { +extends RuntimeException(s"Catalog $catalog does not exist.", cause) { - def this(db: String) = this(db, null) + def this(catalog: String) = this(catalog, null) } /** - * Exception for adding an already existent database + * Exception for adding an already existent catalog * - * @param db database name + * @param catalog catalog name * @param cause the cause */ -case class DatabaseAlreadyExistException( -db: String, +case class CatalogAlreadyExistException( +catalog: String, cause: Throwable) -extends RuntimeException(s"Database $db already exists.", cause) { +extends RuntimeException(s"Catalog $catalog already exists.", cause) { - def this(db: String) = this(db, null) + def this(catalog: String) = this(catalog, null) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/6d178a95/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala ---
flink git commit: [FLINK-6574] [table] Support nested catalogs in ExternalCatalog.
Repository: flink Updated Branches: refs/heads/master 6ae759ae5 -> acea4cde5 [FLINK-6574] [table] Support nested catalogs in ExternalCatalog. This closes #3913. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/acea4cde Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/acea4cde Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/acea4cde Branch: refs/heads/master Commit: acea4cde5f0225db9e00bbef4a47fdb58419022b Parents: 6ae759a Author: Haohui Mai Authored: Mon May 15 17:09:18 2017 -0700 Committer: Fabian Hueske Committed: Fri May 19 14:21:19 2017 +0200 -- .../org/apache/flink/table/api/exceptions.scala | 48 +++ .../table/catalog/CrudExternalCatalog.scala | 78 +- .../flink/table/catalog/ExternalCatalog.scala | 38 ++--- .../table/catalog/ExternalCatalogDatabase.scala | 31 .../table/catalog/ExternalCatalogSchema.scala | 91 +--- .../table/catalog/ExternalCatalogTable.scala| 16 --- .../table/catalog/InMemoryExternalCatalog.scala | 142 +++ .../flink/table/ExternalCatalogTest.scala | 33 + .../catalog/InMemoryExternalCatalogTest.scala | 103 +++--- .../flink/table/utils/CommonTestData.scala | 17 ++- 10 files changed, 259 insertions(+), 338 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala -- diff --git a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala index 760cf75..7ea17fa 100644 --- a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala +++ b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala @@ -77,63 +77,63 @@ case class UnresolvedException(msg: String) extends RuntimeException(msg) /** * Exception for an operation on a nonexistent table * - * @param dbdatabase name - * @param table table name - * @param cause the cause + * @param catalogcatalog name + * @param table table name + * @param cause the cause */ case class TableNotExistException( -db: String, +catalog: String, table: String, cause: Throwable) -extends RuntimeException(s"Table $db.$table does not exist.", cause) { +extends RuntimeException(s"Table $catalog.$table does not exist.", cause) { - def this(db: String, table: String) = this(db, table, null) + def this(catalog: String, table: String) = this(catalog, table, null) } /** * Exception for adding an already existent table * - * @param dbdatabase name - * @param table table name - * @param cause the cause + * @param catalogcatalog name + * @param table table name + * @param cause the cause */ case class TableAlreadyExistException( -db: String, +catalog: String, table: String, cause: Throwable) -extends RuntimeException(s"Table $db.$table already exists.", cause) { +extends RuntimeException(s"Table $catalog.$table already exists.", cause) { - def this(db: String, table: String) = this(db, table, null) + def this(catalog: String, table: String) = this(catalog, table, null) } /** - * Exception for operation on a nonexistent database + * Exception for operation on a nonexistent catalog * - * @param db database name + * @param catalog catalog name * @param cause the cause */ -case class DatabaseNotExistException( -db: String, +case class CatalogNotExistException( +catalog: String, cause: Throwable) -extends RuntimeException(s"Database $db does not exist.", cause) { +extends RuntimeException(s"Catalog $catalog does not exist.", cause) { - def this(db: String) = this(db, null) + def this(catalog: String) = this(catalog, null) } /** - * Exception for adding an already existent database + * Exception for adding an already existent catalog * - * @param db database name + * @param catalog catalog name * @param cause the cause */ -case class DatabaseAlreadyExistException( -db: String, +case class CatalogAlreadyExistException( +catalog: String, cause: Throwable) -extends RuntimeException(s"Database $db already exists.", cause) { +extends RuntimeException(s"Catalog $catalog already exists.", cause) { - def this(db: String) = this(db, null) + def this(catalog: String) = this(catalog, null) } /** http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala -
flink git commit: [FLINK-6582] [docs] Project from maven archetype is not buildable by default
Repository: flink Updated Branches: refs/heads/release-1.3 f62004079 -> d1cff3ae3 [FLINK-6582] [docs] Project from maven archetype is not buildable by default The pom.xml for flink-quickstart-java and flink-quickstart-scala must specify scala.version and scala.binary.version. This closes #3910 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d1cff3ae Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d1cff3ae Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d1cff3ae Branch: refs/heads/release-1.3 Commit: d1cff3ae32665263e404c30d9bc21205092123db Parents: f620040 Author: Greg Hogan Authored: Mon May 15 09:35:36 2017 -0400 Committer: Robert Metzger Committed: Fri May 19 11:45:09 2017 +0200 -- .../main/resources/archetype-resources/pom.xml| 14 ++ .../main/resources/archetype-resources/pom.xml| 5 +++-- flink-quickstart/pom.xml | 18 ++ 3 files changed, 27 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/d1cff3ae/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml -- diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml index 1e525fd..b04f75f 100644 --- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml +++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml @@ -19,7 +19,7 @@ under the License. http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> 4.0.0 - + ${groupId} ${artifactId} ${version} @@ -33,6 +33,7 @@ under the License. 1.3-SNAPSHOT 1.7.7 1.2.17 + @scala.binary.version@ @@ -48,9 +49,9 @@ under the License. - + - http://git-wip-us.apache.org/repos/asf/flink/blob/d1cff3ae/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml -- diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml index 5abe496..6a5eed0 100644 --- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml +++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml @@ -48,10 +48,11 @@ under the License. 1.3-SNAPSHOT 1.7.7 1.2.17 + @scala.binary.version@ http://git-wip-us.apache.org/repos/asf/flink/blob/d1cff3ae/flink-quickstart/pom.xml -- diff --git a/flink-quickstart/pom.xml b/flink-quickstart/pom.xml index 068bbc9..2acccd4 100644 --- a/flink-quickstart/pom.xml +++ b/flink-quickstart/pom.xml @@ -80,6 +80,24 @@ under the License. true + + + + org.apache.maven.plugins + maven-resources-plugin + + false + + @ + + + + + + src/main/resources + true + +
flink git commit: [FLINK-6582] [docs] Project from maven archetype is not buildable by default
Repository: flink Updated Branches: refs/heads/master 0162543ac -> 6ae759ae5 [FLINK-6582] [docs] Project from maven archetype is not buildable by default The pom.xml for flink-quickstart-java and flink-quickstart-scala must specify scala.version and scala.binary.version. This closes #3910 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6ae759ae Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6ae759ae Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6ae759ae Branch: refs/heads/master Commit: 6ae759ae5ece9cfa95bf49ddd44c17397903eaca Parents: 0162543 Author: Greg Hogan Authored: Mon May 15 09:35:36 2017 -0400 Committer: Robert Metzger Committed: Fri May 19 11:43:30 2017 +0200 -- .../main/resources/archetype-resources/pom.xml| 14 ++ .../main/resources/archetype-resources/pom.xml| 5 +++-- flink-quickstart/pom.xml | 18 ++ 3 files changed, 27 insertions(+), 10 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/6ae759ae/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml -- diff --git a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml index 9991d2c..44c56b9 100644 --- a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml +++ b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml @@ -19,7 +19,7 @@ under the License. http://maven.apache.org/POM/4.0.0"; xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd";> 4.0.0 - + ${groupId} ${artifactId} ${version} @@ -33,6 +33,7 @@ under the License. 1.4-SNAPSHOT 1.7.7 1.2.17 + @scala.binary.version@ @@ -48,9 +49,9 @@ under the License. - + - http://git-wip-us.apache.org/repos/asf/flink/blob/6ae759ae/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml -- diff --git a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml index 2139c6b..ff33a6e 100644 --- a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml +++ b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml @@ -48,10 +48,11 @@ under the License. 1.4-SNAPSHOT 1.7.7 1.2.17 + @scala.binary.version@ http://git-wip-us.apache.org/repos/asf/flink/blob/6ae759ae/flink-quickstart/pom.xml -- diff --git a/flink-quickstart/pom.xml b/flink-quickstart/pom.xml index 7e1b460..ea669cd 100644 --- a/flink-quickstart/pom.xml +++ b/flink-quickstart/pom.xml @@ -80,6 +80,24 @@ under the License. true + + + + org.apache.maven.plugins + maven-resources-plugin + + false + + @ + + + + + + src/main/resources + true + +
[1/4] flink git commit: [FLINK-6612] Allow ZooKeeperStateHandleStore to lock created ZNodes
Repository: flink Updated Branches: refs/heads/release-1.3 0963718ac -> f62004079 http://git-wip-us.apache.org/repos/asf/flink/blob/f58fec70/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java new file mode 100644 index 000..0c215cd --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java @@ -0,0 +1,805 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.state.RetrievableStateHandle; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.TestLogger; +import org.apache.zookeeper.data.Stat; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for basic {@link ZooKeeperStateHandleStore} behaviour. + * + * Tests include: + * + * Expected usage of operations + * Correct ordering of ZooKeeper and state handle operations + * + */ +public class ZooKeeperStateHandleStoreTest extends TestLogger { + + private static final ZooKeeperTestEnvironment ZOOKEEPER = new ZooKeeperTestEnvironment(1); + + @AfterClass + public static void tearDown() throws Exception { + if (ZOOKEEPER != null) { + ZOOKEEPER.shutdown(); + } + } + + @Before + public void cleanUp() throws Exception { + ZOOKEEPER.deleteAll(); + } + + /** +* Tests add operation with lock. +*/ + @Test + public void testAddAndLock() throws Exception { + LongStateStorage longStateStorage = new LongStateStorage(); + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore( + ZOOKEEPER.getClient(), longStateStorage, Executors.directExecutor()); + + // Config + final String pathInZooKeeper = "/testAdd"; + final Long state = 1239712317L; + + // Test + store.addAndLock(pathInZooKeeper, state); + + // Verify + // State handle created + assertEquals(1, store.getAllAndLock().size()); + assertEquals(state, store.getAndLock(pathInZooKeeper).retrieveState()); + + // Path created and is persistent + Stat stat = ZOOKEEPER.getClient().checkExists().forPath(pathInZooKeeper); + assertNotNull(stat); + assertEquals(0, stat.getEphemeralOwner()); + + List children = ZOOKEEPER.getClient().getChildren().forPath(pathInZooKeeper); + + // there should be one child which is the lock +
[3/4] flink git commit: [FLINK-6633] Register shared state before adding to CompletedCheckpointStore
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java new file mode 100644 index 000..2a6975a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.checkpoint.savepoint.CheckpointTestUtils; +import org.junit.Test; + +import java.util.Map; +import java.util.Random; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.spy; + +public class IncrementalKeyedStateHandleTest { + + /** +* This test checks, that for an unregistered {@link IncrementalKeyedStateHandle} all state +* (including shared) is discarded. +*/ + @Test + public void testUnregisteredDiscarding() throws Exception { + IncrementalKeyedStateHandle stateHandle = create(new Random(42)); + + stateHandle.discardState(); + + for (StreamStateHandle handle : stateHandle.getPrivateState().values()) { + verify(handle).discardState(); + } + + for (StreamStateHandle handle : stateHandle.getSharedState().values()) { + verify(handle).discardState(); + } + + verify(stateHandle.getMetaStateHandle()).discardState(); + } + + /** +* This test checks, that for a registered {@link IncrementalKeyedStateHandle} discards respect +* all shared state and only discard it one all references are released. +*/ + @Test + public void testSharedStateDeRegistration() throws Exception { + + Random rnd = new Random(42); + + SharedStateRegistry registry = spy(new SharedStateRegistry()); + + // Create two state handles with overlapping shared state + IncrementalKeyedStateHandle stateHandle1 = create(new Random(42)); + IncrementalKeyedStateHandle stateHandle2 = create(new Random(42)); + + // Both handles should not be registered and not discarded by now. + for (Map.Entry entry : + stateHandle1.getSharedState().entrySet()) { + + SharedStateRegistryKey registryKey = + stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey()); + + verify(registry, times(0)).unregisterReference(registryKey); + verify(entry.getValue(), times(0)).discardState(); + } + + for (Map.Entry entry : + stateHandle2.getSharedState().entrySet()) { + + SharedStateRegistryKey registryKey = + stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey()); + + verify(registry, times(0)).unregisterReference(registryKey); + verify(entry.getValue(), times(0)).discardState(); + } + + // Now we register both ... + stateHandle1.registerSharedStates(registry); + stateHandle2.registerSharedStates(registry); + + for (Map.Entry stateHandleEntry : + stateHandle1.getSharedState().entrySet()) { + + SharedStateRegistryKey registryKey = + stateHandle1.createSharedStateRegistryKeyFromFileName(stateHandleEntry.getKey()); + + verify(registry).registerReference( + registryKey, + stateHandleEntry.getValue()); + } + + for (Map.Entry stateHandleEntry : +
[2/4] flink git commit: [FLINK-6612] Allow ZooKeeperStateHandleStore to lock created ZNodes
[FLINK-6612] Allow ZooKeeperStateHandleStore to lock created ZNodes In order to guard against deletions of ZooKeeper nodes which are still being used by a different ZooKeeperStateHandleStore, we have to introduce a locking mechanism. Only after all ZooKeeperStateHandleStores have released their lock, the ZNode is allowed to be deleted. THe locking mechanism is implemented via ephemeral child nodes of the respective ZooKeeper node. Whenever a ZooKeeperStateHandleStore wants to lock a ZNode, thus, protecting it from being deleted, it creates an ephemeral child node. The node's name is unique to the ZooKeeperStateHandleStore instance. The delete operations will then only delete the node if it does not have any children associated. In order to guard against oprhaned lock nodes, they are created as ephemeral nodes. This means that they will be deleted by ZooKeeper once the connection of the ZooKeeper client which created the node timed out. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f58fec70 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f58fec70 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f58fec70 Branch: refs/heads/release-1.3 Commit: f58fec70fef12056bd58b6cc2985532ccb07625e Parents: 0963718 Author: Till Rohrmann Authored: Wed May 17 14:52:04 2017 +0200 Committer: Stefan Richter Committed: Fri May 19 11:00:07 2017 +0200 -- .../store/ZooKeeperMesosWorkerStore.java| 8 +- .../ZooKeeperCompletedCheckpointStore.java | 150 ++-- .../ZooKeeperSubmittedJobGraphStore.java| 50 +- .../zookeeper/ZooKeeperStateHandleStore.java| 419 +++--- .../CompletedCheckpointStoreTest.java | 9 + ...ZooKeeperCompletedCheckpointStoreITCase.java | 133 ++- .../ZooKeeperCompletedCheckpointStoreTest.java | 11 +- .../ZooKeeperStateHandleStoreITCase.java| 642 --- .../ZooKeeperStateHandleStoreTest.java | 805 +++ 9 files changed, 1345 insertions(+), 882 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/f58fec70/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java -- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java index 42abd4c..663ce56 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java @@ -88,7 +88,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore { totalTaskCountInZooKeeper.close(); if(cleanup) { - workersInZooKeeper.removeAndDiscardAllState(); + workersInZooKeeper.releaseAndTryRemoveAll(); } isRunning = false; @@ -169,7 +169,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore { synchronized (startStopLock) { verifyIsRunning(); - List, String>> handles = workersInZooKeeper.getAll(); + List, String>> handles = workersInZooKeeper.getAllAndLock(); if(handles.size() != 0) { List workers = new ArrayList<>(handles.size()); @@ -199,7 +199,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore { int currentVersion = workersInZooKeeper.exists(path); if (currentVersion == -1) { try { - workersInZooKeeper.add(path, worker); + workersInZooKeeper.addAndLock(path, worker); LOG.debug("Added {} in ZooKeeper.", worker); } catch (KeeperException.NodeExistsException ex) { throw new ConcurrentModificationException("ZooKeeper unexpectedly modified", ex); @@ -227,7 +227,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore { return false; } - workersInZooKeeper.removeAndDiscardState(path); + workersInZooKeeper.releaseAndTryRemove(path); LOG.debug("Removed worker {} from ZooKeeper.", taskID
[4/4] flink git commit: [FLINK-6633] Register shared state before adding to CompletedCheckpointStore
[FLINK-6633] Register shared state before adding to CompletedCheckpointStore Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6200407 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6200407 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6200407 Branch: refs/heads/release-1.3 Commit: f6200407979fb6987f86d7029df81b345f0d9525 Parents: f58fec7 Author: Stefan Richter Authored: Tue May 16 12:32:05 2017 +0200 Committer: Stefan Richter Committed: Fri May 19 11:01:12 2017 +0200 -- .../state/RocksDBKeyedStateBackend.java | 56 +-- .../state/RocksDBStateBackendTest.java | 88 - .../runtime/checkpoint/CompletedCheckpoint.java | 144 ++- .../flink/runtime/checkpoint/OperatorState.java | 7 - .../checkpoint/OperatorSubtaskState.java| 11 - .../StandaloneCompletedCheckpointStore.java | 4 +- .../flink/runtime/checkpoint/SubtaskState.java | 11 - .../flink/runtime/checkpoint/TaskState.java | 7 - .../ZooKeeperCompletedCheckpointStore.java | 149 +++- .../savepoint/SavepointV2Serializer.java| 17 +- .../runtime/state/CompositeStateHandle.java | 15 +- .../state/IncrementalKeyedStateHandle.java | 171 - .../runtime/state/KeyGroupsStateHandle.java | 5 - .../state/PlaceholderStreamStateHandle.java | 44 +-- .../runtime/state/SharedStateRegistry.java | 54 +-- .../state/memory/ByteStreamStateHandle.java | 7 + .../checkpoint/CheckpointCoordinatorTest.java | 25 -- .../CompletedCheckpointStoreTest.java | 61 +-- .../checkpoint/CompletedCheckpointTest.java | 3 - .../checkpoint/PendingCheckpointTest.java | 1 - ...ZooKeeperCompletedCheckpointStoreITCase.java | 7 +- .../savepoint/CheckpointTestUtils.java | 25 +- .../state/IncrementalKeyedStateHandleTest.java | 206 ++ .../runtime/state/SharedStateRegistryTest.java | 14 +- .../runtime/state/StateBackendTestBase.java | 2 - .../RecoverableCompletedCheckpointStore.java| 5 +- ...tractEventTimeWindowCheckpointingITCase.java | 9 +- .../JobManagerHACheckpointRecoveryITCase.java | 375 +-- 28 files changed, 747 insertions(+), 776 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java -- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 88a759d..1f32a89 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -105,6 +105,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.PriorityQueue; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.UUID; @@ -170,8 +171,8 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { /** True if incremental checkpointing is enabled */ private final boolean enableIncrementalCheckpointing; - /** The sst files materialized in pending checkpoints */ - private final SortedMap> materializedSstFiles = new TreeMap<>(); + /** The state handle ids of all sst files materialized in snapshots for previous checkpoints */ + private final SortedMap> materializedSstFiles = new TreeMap<>(); /** The identifier of the last completed checkpoint */ private long lastCompletedCheckpointId = -1; @@ -720,7 +721,7 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { private final long checkpointTimestamp; /** All sst files that were part of the last previously completed checkpoint */ - private Map baseSstFiles; + private Set baseSstFiles; /** The state meta data */ private final List> stateMetaInfoSnapshots = new ArrayList<>(); @@ -732,10 +733,7 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { private final CloseableRegistry closeableRegistry = new CloseableRegistry(); // new sst files since the last completed checkpoint - private final Map newSstFiles = new HashMap<>(); - - // old sst files which have been materialized in pre
[2/4] flink git commit: [FLINK-6633] Register shared state before adding to CompletedCheckpointStore
[FLINK-6633] Register shared state before adding to CompletedCheckpointStore Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0162543a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0162543a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0162543a Branch: refs/heads/master Commit: 0162543ac13f048ef67a6586d8a6e8021ec9dcd4 Parents: 3d119e1 Author: Stefan Richter Authored: Tue May 16 12:32:05 2017 +0200 Committer: Stefan Richter Committed: Fri May 19 10:57:32 2017 +0200 -- .../state/RocksDBKeyedStateBackend.java | 56 +-- .../state/RocksDBStateBackendTest.java | 88 - .../runtime/checkpoint/CompletedCheckpoint.java | 144 ++- .../flink/runtime/checkpoint/OperatorState.java | 7 - .../checkpoint/OperatorSubtaskState.java| 11 - .../StandaloneCompletedCheckpointStore.java | 4 +- .../flink/runtime/checkpoint/SubtaskState.java | 11 - .../flink/runtime/checkpoint/TaskState.java | 7 - .../ZooKeeperCompletedCheckpointStore.java | 149 +++- .../savepoint/SavepointV2Serializer.java| 17 +- .../runtime/state/CompositeStateHandle.java | 15 +- .../state/IncrementalKeyedStateHandle.java | 171 - .../runtime/state/KeyGroupsStateHandle.java | 5 - .../state/PlaceholderStreamStateHandle.java | 44 +-- .../runtime/state/SharedStateRegistry.java | 54 +-- .../state/memory/ByteStreamStateHandle.java | 7 + .../checkpoint/CheckpointCoordinatorTest.java | 25 -- .../CompletedCheckpointStoreTest.java | 61 +-- .../checkpoint/CompletedCheckpointTest.java | 3 - .../checkpoint/PendingCheckpointTest.java | 1 - ...ZooKeeperCompletedCheckpointStoreITCase.java | 7 +- .../savepoint/CheckpointTestUtils.java | 25 +- .../state/IncrementalKeyedStateHandleTest.java | 206 ++ .../runtime/state/SharedStateRegistryTest.java | 14 +- .../runtime/state/StateBackendTestBase.java | 2 - .../RecoverableCompletedCheckpointStore.java| 5 +- ...tractEventTimeWindowCheckpointingITCase.java | 9 +- .../JobManagerHACheckpointRecoveryITCase.java | 375 +-- 28 files changed, 747 insertions(+), 776 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java -- diff --git a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java index 88a759d..1f32a89 100644 --- a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java +++ b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java @@ -105,6 +105,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.PriorityQueue; +import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import java.util.UUID; @@ -170,8 +171,8 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { /** True if incremental checkpointing is enabled */ private final boolean enableIncrementalCheckpointing; - /** The sst files materialized in pending checkpoints */ - private final SortedMap> materializedSstFiles = new TreeMap<>(); + /** The state handle ids of all sst files materialized in snapshots for previous checkpoints */ + private final SortedMap> materializedSstFiles = new TreeMap<>(); /** The identifier of the last completed checkpoint */ private long lastCompletedCheckpointId = -1; @@ -720,7 +721,7 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { private final long checkpointTimestamp; /** All sst files that were part of the last previously completed checkpoint */ - private Map baseSstFiles; + private Set baseSstFiles; /** The state meta data */ private final List> stateMetaInfoSnapshots = new ArrayList<>(); @@ -732,10 +733,7 @@ public class RocksDBKeyedStateBackend extends AbstractKeyedStateBackend { private final CloseableRegistry closeableRegistry = new CloseableRegistry(); // new sst files since the last completed checkpoint - private final Map newSstFiles = new HashMap<>(); - - // old sst files which have been materialized in previous
[1/4] flink git commit: [FLINK-6633] Register shared state before adding to CompletedCheckpointStore
Repository: flink Updated Branches: refs/heads/master b8f8524af -> 0162543ac http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java new file mode 100644 index 000..2a6975a --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java @@ -0,0 +1,206 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.runtime.checkpoint.savepoint.CheckpointTestUtils; +import org.junit.Test; + +import java.util.Map; +import java.util.Random; + +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.powermock.api.mockito.PowerMockito.spy; + +public class IncrementalKeyedStateHandleTest { + + /** +* This test checks, that for an unregistered {@link IncrementalKeyedStateHandle} all state +* (including shared) is discarded. +*/ + @Test + public void testUnregisteredDiscarding() throws Exception { + IncrementalKeyedStateHandle stateHandle = create(new Random(42)); + + stateHandle.discardState(); + + for (StreamStateHandle handle : stateHandle.getPrivateState().values()) { + verify(handle).discardState(); + } + + for (StreamStateHandle handle : stateHandle.getSharedState().values()) { + verify(handle).discardState(); + } + + verify(stateHandle.getMetaStateHandle()).discardState(); + } + + /** +* This test checks, that for a registered {@link IncrementalKeyedStateHandle} discards respect +* all shared state and only discard it one all references are released. +*/ + @Test + public void testSharedStateDeRegistration() throws Exception { + + Random rnd = new Random(42); + + SharedStateRegistry registry = spy(new SharedStateRegistry()); + + // Create two state handles with overlapping shared state + IncrementalKeyedStateHandle stateHandle1 = create(new Random(42)); + IncrementalKeyedStateHandle stateHandle2 = create(new Random(42)); + + // Both handles should not be registered and not discarded by now. + for (Map.Entry entry : + stateHandle1.getSharedState().entrySet()) { + + SharedStateRegistryKey registryKey = + stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey()); + + verify(registry, times(0)).unregisterReference(registryKey); + verify(entry.getValue(), times(0)).discardState(); + } + + for (Map.Entry entry : + stateHandle2.getSharedState().entrySet()) { + + SharedStateRegistryKey registryKey = + stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey()); + + verify(registry, times(0)).unregisterReference(registryKey); + verify(entry.getValue(), times(0)).discardState(); + } + + // Now we register both ... + stateHandle1.registerSharedStates(registry); + stateHandle2.registerSharedStates(registry); + + for (Map.Entry stateHandleEntry : + stateHandle1.getSharedState().entrySet()) { + + SharedStateRegistryKey registryKey = + stateHandle1.createSharedStateRegistryKeyFromFileName(stateHandleEntry.getKey()); + + verify(registry).registerReference( + registryKey, + stateHandleEntry.getVa
[3/4] flink git commit: [FLINK-6612] Allow ZooKeeperStateHandleStore to lock created ZNodes
http://git-wip-us.apache.org/repos/asf/flink/blob/3d119e11/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java -- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java new file mode 100644 index 000..0c215cd --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java @@ -0,0 +1,805 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.zookeeper; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; +import org.apache.flink.runtime.concurrent.Executors; +import org.apache.flink.runtime.state.RetrievableStateHandle; +import org.apache.flink.runtime.util.ZooKeeperUtils; +import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.TestLogger; +import org.apache.zookeeper.data.Stat; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.concurrent.CountDownLatch; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.spy; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +/** + * Tests for basic {@link ZooKeeperStateHandleStore} behaviour. + * + * Tests include: + * + * Expected usage of operations + * Correct ordering of ZooKeeper and state handle operations + * + */ +public class ZooKeeperStateHandleStoreTest extends TestLogger { + + private static final ZooKeeperTestEnvironment ZOOKEEPER = new ZooKeeperTestEnvironment(1); + + @AfterClass + public static void tearDown() throws Exception { + if (ZOOKEEPER != null) { + ZOOKEEPER.shutdown(); + } + } + + @Before + public void cleanUp() throws Exception { + ZOOKEEPER.deleteAll(); + } + + /** +* Tests add operation with lock. +*/ + @Test + public void testAddAndLock() throws Exception { + LongStateStorage longStateStorage = new LongStateStorage(); + ZooKeeperStateHandleStore store = new ZooKeeperStateHandleStore( + ZOOKEEPER.getClient(), longStateStorage, Executors.directExecutor()); + + // Config + final String pathInZooKeeper = "/testAdd"; + final Long state = 1239712317L; + + // Test + store.addAndLock(pathInZooKeeper, state); + + // Verify + // State handle created + assertEquals(1, store.getAllAndLock().size()); + assertEquals(state, store.getAndLock(pathInZooKeeper).retrieveState()); + + // Path created and is persistent + Stat stat = ZOOKEEPER.getClient().checkExists().forPath(pathInZooKeeper); + assertNotNull(stat); + assertEquals(0, stat.getEphemeralOwner()); + + List children = ZOOKEEPER.getClient().getChildren().forPath(pathInZooKeeper); + + // there should be one child which is the lock + assertEquals(1, children.size()); + + stat = ZOOKEEPER.getClien
[4/4] flink git commit: [FLINK-6612] Allow ZooKeeperStateHandleStore to lock created ZNodes
[FLINK-6612] Allow ZooKeeperStateHandleStore to lock created ZNodes In order to guard against deletions of ZooKeeper nodes which are still being used by a different ZooKeeperStateHandleStore, we have to introduce a locking mechanism. Only after all ZooKeeperStateHandleStores have released their lock, the ZNode is allowed to be deleted. THe locking mechanism is implemented via ephemeral child nodes of the respective ZooKeeper node. Whenever a ZooKeeperStateHandleStore wants to lock a ZNode, thus, protecting it from being deleted, it creates an ephemeral child node. The node's name is unique to the ZooKeeperStateHandleStore instance. The delete operations will then only delete the node if it does not have any children associated. In order to guard against oprhaned lock nodes, they are created as ephemeral nodes. This means that they will be deleted by ZooKeeper once the connection of the ZooKeeper client which created the node timed out. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d119e11 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d119e11 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d119e11 Branch: refs/heads/master Commit: 3d119e1155aa8930cc7b18a085d6790cb2c63b70 Parents: b8f8524 Author: Till Rohrmann Authored: Wed May 17 14:52:04 2017 +0200 Committer: Stefan Richter Committed: Fri May 19 10:57:32 2017 +0200 -- .../store/ZooKeeperMesosWorkerStore.java| 8 +- .../ZooKeeperCompletedCheckpointStore.java | 150 ++-- .../ZooKeeperSubmittedJobGraphStore.java| 50 +- .../zookeeper/ZooKeeperStateHandleStore.java| 419 +++--- .../CompletedCheckpointStoreTest.java | 9 + ...ZooKeeperCompletedCheckpointStoreITCase.java | 133 ++- .../ZooKeeperCompletedCheckpointStoreTest.java | 11 +- .../ZooKeeperStateHandleStoreITCase.java| 642 --- .../ZooKeeperStateHandleStoreTest.java | 805 +++ 9 files changed, 1345 insertions(+), 882 deletions(-) -- http://git-wip-us.apache.org/repos/asf/flink/blob/3d119e11/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java -- diff --git a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java index 42abd4c..663ce56 100644 --- a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java +++ b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java @@ -88,7 +88,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore { totalTaskCountInZooKeeper.close(); if(cleanup) { - workersInZooKeeper.removeAndDiscardAllState(); + workersInZooKeeper.releaseAndTryRemoveAll(); } isRunning = false; @@ -169,7 +169,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore { synchronized (startStopLock) { verifyIsRunning(); - List, String>> handles = workersInZooKeeper.getAll(); + List, String>> handles = workersInZooKeeper.getAllAndLock(); if(handles.size() != 0) { List workers = new ArrayList<>(handles.size()); @@ -199,7 +199,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore { int currentVersion = workersInZooKeeper.exists(path); if (currentVersion == -1) { try { - workersInZooKeeper.add(path, worker); + workersInZooKeeper.addAndLock(path, worker); LOG.debug("Added {} in ZooKeeper.", worker); } catch (KeeperException.NodeExistsException ex) { throw new ConcurrentModificationException("ZooKeeper unexpectedly modified", ex); @@ -227,7 +227,7 @@ public class ZooKeeperMesosWorkerStore implements MesosWorkerStore { return false; } - workersInZooKeeper.removeAndDiscardState(path); + workersInZooKeeper.releaseAndTryRemove(path); LOG.debug("Removed worker {} from ZooKeeper.", taskID);