buildbot failure in on flink-docs-release-0.9

2017-05-19 Thread buildbot
The Buildbot has detected a new failure on builder flink-docs-release-0.9 while 
building . Full details are available at:
https://ci.apache.org/builders/flink-docs-release-0.9/builds/695

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave2_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-0.9' 
triggered this build
Build Source Stamp: [branch release-0.9] HEAD
Blamelist: 

BUILD FAILED: failed Flink docs

Sincerely,
 -The Buildbot





buildbot failure in on flink-docs-release-1.0

2017-05-19 Thread buildbot
The Buildbot has detected a new failure on builder flink-docs-release-1.0 while 
building . Full details are available at:
https://ci.apache.org/builders/flink-docs-release-1.0/builds/454

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave3_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-1.0' 
triggered this build
Build Source Stamp: [branch release-1.0] HEAD
Blamelist: 

BUILD FAILED: failed Flink docs

Sincerely,
 -The Buildbot





buildbot failure in on flink-docs-release-1.1

2017-05-19 Thread buildbot
The Buildbot has detected a new failure on builder flink-docs-release-1.1 while 
building . Full details are available at:
https://ci.apache.org/builders/flink-docs-release-1.1/builds/300

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave3_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-1.1' 
triggered this build
Build Source Stamp: [branch release-1.1] HEAD
Blamelist: 

BUILD FAILED: failed Flink docs

Sincerely,
 -The Buildbot





buildbot failure in on flink-docs-release-1.2

2017-05-19 Thread buildbot
The Buildbot has detected a new failure on builder flink-docs-release-1.2 while 
building . Full details are available at:
https://ci.apache.org/builders/flink-docs-release-1.2/builds/124

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave2_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-release-1.2' 
triggered this build
Build Source Stamp: [branch release-1.2] HEAD
Blamelist: 

BUILD FAILED: failed Flink docs

Sincerely,
 -The Buildbot





buildbot failure in on flink-docs-master

2017-05-19 Thread buildbot
The Buildbot has detected a new failure on builder flink-docs-master while 
building . Full details are available at:
https://ci.apache.org/builders/flink-docs-master/builds/717

Buildbot URL: https://ci.apache.org/

Buildslave for this Build: bb_slave3_ubuntu

Build Reason: The Nightly scheduler named 'flink-nightly-docs-master' triggered 
this build
Build Source Stamp: [branch master] HEAD
Blamelist: 

BUILD FAILED: failed Flink docs

Sincerely,
 -The Buildbot





[5/5] flink git commit: [FLINK-5636][metrics] Measure numRecordsIn in StreamTwoInputProcessor

2017-05-19 Thread chesnay
[FLINK-5636][metrics] Measure numRecordsIn in StreamTwoInputProcessor


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/54b88d71
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/54b88d71
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/54b88d71

Branch: refs/heads/release-1.3
Commit: 54b88d71c78a670762e152b337c04a2e68e8481d
Parents: 17a4bb1
Author: zentol 
Authored: Fri May 19 14:39:20 2017 +0200
Committer: zentol 
Committed: Fri May 19 21:09:08 2017 +0200

--
 .../flink/streaming/runtime/io/StreamTwoInputProcessor.java | 9 +
 1 file changed, 9 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/54b88d71/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index d34686d..367b773 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
@@ -114,6 +116,8 @@ public class StreamTwoInputProcessor {
private long lastEmittedWatermark1;
private long lastEmittedWatermark2;
 
+   private Counter numRecordsIn;
+
private boolean isFinished;
 
@SuppressWarnings("unchecked")
@@ -195,6 +199,9 @@ public class StreamTwoInputProcessor {
if (isFinished) {
return false;
}
+   if (numRecordsIn == null) {
+   numRecordsIn = ((OperatorMetricGroup) 
streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+   }
 
while (true) {
if (currentRecordDeserializer != null) {
@@ -230,6 +237,7 @@ public class StreamTwoInputProcessor {
else {
StreamRecord 
record = recordOrWatermark.asRecord();
synchronized (lock) {
+   
numRecordsIn.inc();

streamOperator.setKeyContextElement1(record);

streamOperator.processElement1(record);
}
@@ -256,6 +264,7 @@ public class StreamTwoInputProcessor {
else {
StreamRecord 
record = recordOrWatermark.asRecord();
synchronized (lock) {
+   
numRecordsIn.inc();

streamOperator.setKeyContextElement2(record);

streamOperator.processElement2(record);
}



[3/5] flink git commit: [hotfix][rat] Add exclusion for all test snapshots/savepoints

2017-05-19 Thread chesnay
[hotfix][rat] Add exclusion for all test snapshots/savepoints

This closes #3854.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17a4bb14
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17a4bb14
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17a4bb14

Branch: refs/heads/release-1.3
Commit: 17a4bb14821fd107131e1051b57326e43d660195
Parents: c62553c
Author: zentol 
Authored: Wed May 10 16:50:18 2017 +0200
Committer: zentol 
Committed: Fri May 19 21:09:08 2017 +0200

--
 ...inkKafkaConsumerBaseFrom11MigrationTest.java |   6 +++---
 ...inkKafkaConsumerBaseFrom12MigrationTest.java |   8 
 ...migration-test-flink1.1-empty-state-snapshot | Bin 0 -> 468 bytes
 ...migration-test-flink1.1-snapshot-empty-state | Bin 468 -> 0 bytes
 ...migration-test-flink1.2-empty-state-snapshot | Bin 0 -> 240 bytes
 ...migration-test-flink1.2-snapshot-empty-state | Bin 240 -> 0 bytes
 .../FlinkKinesisConsumerMigrationTest.java  |   2 +-
 ...sumer-migration-test-flink1.1-empty-snapshot | Bin 0 -> 468 bytes
 ...sumer-migration-test-flink1.1-snapshot-empty | Bin 468 -> 0 bytes
 .../cep/operator/CEPMigration11to13Test.java|  10 +-
 .../test/resources/cep-branching-1_2-snapshot   | Bin 0 -> 6736 bytes
 .../test/resources/cep-branching-snapshot-1.2   | Bin 6736 -> 0 bytes
 .../src/test/resources/cep-keyed-1_1-snapshot   | Bin 0 -> 5612 bytes
 .../src/test/resources/cep-keyed-snapshot-1.1   | Bin 5612 -> 0 bytes
 .../test/resources/cep-non-keyed-1.1-snapshot   | Bin 0 -> 3274 bytes
 .../test/resources/cep-non-keyed-snapshot-1.1   | Bin 3274 -> 0 bytes
 .../resources/cep-single-pattern-1.2-snapshot   | Bin 0 -> 3311 bytes
 .../resources/cep-single-pattern-snapshot-1.2   | Bin 3311 -> 0 bytes
 .../test/resources/cep-starting-1.2-snapshot| Bin 0 -> 6526 bytes
 .../test/resources/cep-starting-snapshot-1.2| Bin 6526 -> 0 bytes
 ...atefulJobSavepointFrom11MigrationITCase.java |   4 ++--
 ...atefulJobSavepointFrom12MigrationITCase.java |   4 ++--
 ...-migration-itcase-flink1.1-rocksdb-savepoint | Bin 0 -> 22283 bytes
 ...-migration-itcase-flink1.1-savepoint-rocksdb | Bin 22283 -> 0 bytes
 ...-migration-itcase-flink1.2-rocksdb-savepoint | Bin 0 -> 25256 bytes
 ...-migration-itcase-flink1.2-savepoint-rocksdb | Bin 25256 -> 0 bytes
 pom.xml |  19 ++-
 27 files changed, 19 insertions(+), 34 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/17a4bb14/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
--
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
index 7cc1f9c..c07ebd5 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
@@ -67,7 +67,7 @@ public class FlinkKafkaConsumerBaseFrom11MigrationTest {
testHarness.setup();
// restore state from binary snapshot file using legacy method
testHarness.initializeStateFromLegacyCheckpoint(
-   
getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state"));
+   
getResourceFilename("kafka-consumer-migration-test-flink1.1-empty-state-snapshot"));
testHarness.open();
 
// assert that no partitions were found and is empty
@@ -101,10 +101,10 @@ public class FlinkKafkaConsumerBaseFrom11MigrationTest {
testHarness.setup();
// restore state from binary snapshot file using legacy method
testHarness.initializeStateFromLegacyCheckpoint(
-   
getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state"));
+   
getResourceFilename("kafka-consumer-migration-test-flink1.1-empty-state-snapshot"));
testHarness.open();
 
-   // the expected state in 
"kafka-consumer-migration-test-flink1.1-snapshot-empty-state";
+   // the expected state in 
"kafka-consumer-migration-test-flink1.1-empty-state-snapshot";
// since the state is empty, the consumer should reflect on the 
startup mode to determine start offsets.
  

[1/5] flink git commit: [FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil

2017-05-19 Thread chesnay
Repository: flink
Updated Branches:
  refs/heads/release-1.3 c62553c00 -> 5fde739fd


[FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5fde739f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5fde739f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5fde739f

Branch: refs/heads/release-1.3
Commit: 5fde739fd2b040a90d42a6a73f1d119648e863a7
Parents: c3ab5c8
Author: zjureel 
Authored: Mon May 15 18:14:11 2017 +0800
Committer: zentol 
Committed: Fri May 19 21:09:08 2017 +0200

--
 .../streaming/util/OperatorSnapshotUtil.java| 162 ++-
 1 file changed, 82 insertions(+), 80 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/5fde739f/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
--
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
index 92a9452..8011279 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -46,111 +46,113 @@ public class OperatorSnapshotUtil {
 
public static void writeStateHandle(OperatorStateHandles state, String 
path) throws IOException {
FileOutputStream out = new FileOutputStream(path);
-   DataOutputStream dos = new DataOutputStream(out);
-
-   dos.writeInt(state.getOperatorChainIndex());
-
-   
SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(),
 dos);
-
-   Collection rawOperatorState = 
state.getRawOperatorState();
-   if (rawOperatorState != null) {
-   dos.writeInt(rawOperatorState.size());
-   for (OperatorStateHandle operatorStateHandle : 
rawOperatorState) {
-   
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+   
+   try (DataOutputStream dos = new DataOutputStream(out)) {
+
+   dos.writeInt(state.getOperatorChainIndex());
+
+   
SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(),
 dos);
+
+   Collection rawOperatorState = 
state.getRawOperatorState();
+   if (rawOperatorState != null) {
+   dos.writeInt(rawOperatorState.size());
+   for (OperatorStateHandle operatorStateHandle : 
rawOperatorState) {
+   
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+   }
+   } else {
+   // this means no states, not even an empty list
+   dos.writeInt(-1);
}
-   } else {
-   // this means no states, not even an empty list
-   dos.writeInt(-1);
-   }
 
-   Collection managedOperatorState = 
state.getManagedOperatorState();
-   if (managedOperatorState != null) {
-   dos.writeInt(managedOperatorState.size());
-   for (OperatorStateHandle operatorStateHandle : 
managedOperatorState) {
-   
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+   Collection managedOperatorState = 
state.getManagedOperatorState();
+   if (managedOperatorState != null) {
+   dos.writeInt(managedOperatorState.size());
+   for (OperatorStateHandle operatorStateHandle : 
managedOperatorState) {
+   
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+   }
+   } else {
+   // this means no states, not even an empty list
+   dos.writeInt(-1);
}
-   } else {
-   // this means no states, not even an empty list
-   dos.writeInt(-1);
-   }
 
-   Collection rawKeyedState = 
state.getRawKeyedState();
-   if (rawKeyedState != null) {
-   dos.writeInt(rawKeyedState.size());
-   for (KeyedStateHandle keyedStateHandle :

[4/5] flink git commit: [FLINK-6586] InputGateMetrics return 0 as min for local channels

2017-05-19 Thread chesnay
[FLINK-6586] InputGateMetrics return 0 as min for local channels


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c3ab5c82
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c3ab5c82
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c3ab5c82

Branch: refs/heads/release-1.3
Commit: c3ab5c8253b32bddc3fb9bf0c1085813e7f97e2f
Parents: 5d1cda5
Author: zentol 
Authored: Mon May 15 13:56:06 2017 +0200
Committer: zentol 
Committed: Fri May 19 21:09:08 2017 +0200

--
 .../io/network/partition/consumer/InputGateMetrics.java   | 7 +++
 1 file changed, 3 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/c3ab5c82/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
index 796a6db..69af455 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
@@ -72,10 +72,6 @@ public class InputGateMetrics {
int min = Integer.MAX_VALUE;
 
Collection channels = 
inputGate.getInputChannels().values();
-   if (channels.isEmpty()) {
-   // meaningful value when no channels exist:
-   return 0;
-   }
 
for (InputChannel channel : channels) {
if (channel instanceof RemoteInputChannel) {
@@ -86,6 +82,9 @@ public class InputGateMetrics {
}
}
 
+   if (min == Integer.MAX_VALUE) { // in case all channels are 
local, or the channel collection was empty
+   return 0;
+   }
return min;
}
 



[2/5] flink git commit: [FLINK-6639][docs] fix code tabs in CEP docs

2017-05-19 Thread chesnay
[FLINK-6639][docs] fix code tabs in CEP docs


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/5d1cda52
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/5d1cda52
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/5d1cda52

Branch: refs/heads/release-1.3
Commit: 5d1cda52e58b53a0ae9b3e9a691102617a475aff
Parents: 54b88d7
Author: David Anderson 
Authored: Fri May 19 15:47:17 2017 +0200
Committer: zentol 
Committed: Fri May 19 21:09:08 2017 +0200

--
 docs/dev/libs/cep.md | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/5d1cda52/docs/dev/libs/cep.md
--
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index 58e1a0a..a5ca8b1 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -282,6 +282,7 @@ Pattern nonStrictNext = 
start.followedBy("middle");
 val nonStrictNext : Pattern[Event, _] = start.followedBy("middle")
 {% endhighlight %}
 
+
 
 For non-strict contiguity one can specify if only the first succeeding 
matching event will be matched, or
 all. In the latter case multiple matches will be emitted for the same 
beginning.



[3/5] flink git commit: [FLINK-6586] InputGateMetrics return 0 as min for local channels

2017-05-19 Thread chesnay
[FLINK-6586] InputGateMetrics return 0 as min for local channels

This closes #3907.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/17ec6f02
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/17ec6f02
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/17ec6f02

Branch: refs/heads/master
Commit: 17ec6f020b779efe9152456f4ef33f6f802e4f67
Parents: fadc026
Author: zentol 
Authored: Mon May 15 13:56:06 2017 +0200
Committer: zentol 
Committed: Fri May 19 21:08:34 2017 +0200

--
 .../io/network/partition/consumer/InputGateMetrics.java   | 7 +++
 1 file changed, 3 insertions(+), 4 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/17ec6f02/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
index 796a6db..69af455 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/InputGateMetrics.java
@@ -72,10 +72,6 @@ public class InputGateMetrics {
int min = Integer.MAX_VALUE;
 
Collection channels = 
inputGate.getInputChannels().values();
-   if (channels.isEmpty()) {
-   // meaningful value when no channels exist:
-   return 0;
-   }
 
for (InputChannel channel : channels) {
if (channel instanceof RemoteInputChannel) {
@@ -86,6 +82,9 @@ public class InputGateMetrics {
}
}
 
+   if (min == Integer.MAX_VALUE) { // in case all channels are 
local, or the channel collection was empty
+   return 0;
+   }
return min;
}
 



[1/5] flink git commit: [hotfix][rat] Add exclusion for all test snapshots/savepoints

2017-05-19 Thread chesnay
Repository: flink
Updated Branches:
  refs/heads/master 8e3213678 -> 65fdadac8


[hotfix][rat] Add exclusion for all test snapshots/savepoints

This closes #3854.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4b485307
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4b485307
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4b485307

Branch: refs/heads/master
Commit: 4b485307800d04af460fec29f6f2b34b1ff189b1
Parents: 8e32136
Author: zentol 
Authored: Wed May 10 16:50:18 2017 +0200
Committer: zentol 
Committed: Fri May 19 21:08:33 2017 +0200

--
 ...inkKafkaConsumerBaseFrom11MigrationTest.java |   6 +++---
 ...inkKafkaConsumerBaseFrom12MigrationTest.java |   8 
 ...migration-test-flink1.1-empty-state-snapshot | Bin 0 -> 468 bytes
 ...migration-test-flink1.1-snapshot-empty-state | Bin 468 -> 0 bytes
 ...migration-test-flink1.2-empty-state-snapshot | Bin 0 -> 240 bytes
 ...migration-test-flink1.2-snapshot-empty-state | Bin 240 -> 0 bytes
 .../FlinkKinesisConsumerMigrationTest.java  |   2 +-
 ...sumer-migration-test-flink1.1-empty-snapshot | Bin 0 -> 468 bytes
 ...sumer-migration-test-flink1.1-snapshot-empty | Bin 468 -> 0 bytes
 .../cep/operator/CEPMigration11to13Test.java|  10 +-
 .../test/resources/cep-branching-1_2-snapshot   | Bin 0 -> 6736 bytes
 .../test/resources/cep-branching-snapshot-1.2   | Bin 6736 -> 0 bytes
 .../src/test/resources/cep-keyed-1_1-snapshot   | Bin 0 -> 5612 bytes
 .../src/test/resources/cep-keyed-snapshot-1.1   | Bin 5612 -> 0 bytes
 .../test/resources/cep-non-keyed-1.1-snapshot   | Bin 0 -> 3274 bytes
 .../test/resources/cep-non-keyed-snapshot-1.1   | Bin 3274 -> 0 bytes
 .../resources/cep-single-pattern-1.2-snapshot   | Bin 0 -> 3311 bytes
 .../resources/cep-single-pattern-snapshot-1.2   | Bin 3311 -> 0 bytes
 .../test/resources/cep-starting-1.2-snapshot| Bin 0 -> 6526 bytes
 .../test/resources/cep-starting-snapshot-1.2| Bin 6526 -> 0 bytes
 ...atefulJobSavepointFrom11MigrationITCase.java |   4 ++--
 ...atefulJobSavepointFrom12MigrationITCase.java |   4 ++--
 ...-migration-itcase-flink1.1-rocksdb-savepoint | Bin 0 -> 22283 bytes
 ...-migration-itcase-flink1.1-savepoint-rocksdb | Bin 22283 -> 0 bytes
 ...-migration-itcase-flink1.2-rocksdb-savepoint | Bin 0 -> 25256 bytes
 ...-migration-itcase-flink1.2-savepoint-rocksdb | Bin 25256 -> 0 bytes
 pom.xml |  19 ++-
 27 files changed, 19 insertions(+), 34 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/4b485307/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
--
diff --git 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
index 7cc1f9c..c07ebd5 100644
--- 
a/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
+++ 
b/flink-connectors/flink-connector-kafka-base/src/test/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaConsumerBaseFrom11MigrationTest.java
@@ -67,7 +67,7 @@ public class FlinkKafkaConsumerBaseFrom11MigrationTest {
testHarness.setup();
// restore state from binary snapshot file using legacy method
testHarness.initializeStateFromLegacyCheckpoint(
-   
getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state"));
+   
getResourceFilename("kafka-consumer-migration-test-flink1.1-empty-state-snapshot"));
testHarness.open();
 
// assert that no partitions were found and is empty
@@ -101,10 +101,10 @@ public class FlinkKafkaConsumerBaseFrom11MigrationTest {
testHarness.setup();
// restore state from binary snapshot file using legacy method
testHarness.initializeStateFromLegacyCheckpoint(
-   
getResourceFilename("kafka-consumer-migration-test-flink1.1-snapshot-empty-state"));
+   
getResourceFilename("kafka-consumer-migration-test-flink1.1-empty-state-snapshot"));
testHarness.open();
 
-   // the expected state in 
"kafka-consumer-migration-test-flink1.1-snapshot-empty-state";
+   // the expected state in 
"kafka-consumer-migration-test-flink1.1-empty-state-snapshot";
// since the state is empty, the consume

[2/5] flink git commit: [FLINK-6639][docs] fix code tabs in CEP docs

2017-05-19 Thread chesnay
[FLINK-6639][docs] fix code tabs in CEP docs

This closes #3952.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/fadc026b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/fadc026b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/fadc026b

Branch: refs/heads/master
Commit: fadc026bf1e90cd001bd442e5bca595eb69907cf
Parents: 8ccaffe
Author: David Anderson 
Authored: Fri May 19 15:47:17 2017 +0200
Committer: zentol 
Committed: Fri May 19 21:08:34 2017 +0200

--
 docs/dev/libs/cep.md | 1 +
 1 file changed, 1 insertion(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/fadc026b/docs/dev/libs/cep.md
--
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index 58e1a0a..a5ca8b1 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -282,6 +282,7 @@ Pattern nonStrictNext = 
start.followedBy("middle");
 val nonStrictNext : Pattern[Event, _] = start.followedBy("middle")
 {% endhighlight %}
 
+
 
 For non-strict contiguity one can specify if only the first succeeding 
matching event will be matched, or
 all. In the latter case multiple matches will be emitted for the same 
beginning.



[5/5] flink git commit: [FLINK-5636][metrics] Measure numRecordsIn in StreamTwoInputProcessor

2017-05-19 Thread chesnay
[FLINK-5636][metrics] Measure numRecordsIn in StreamTwoInputProcessor

This closes #3950.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8ccaffe3
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8ccaffe3
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8ccaffe3

Branch: refs/heads/master
Commit: 8ccaffe3d3f2472fc12fa138c45c0b67458ad2a2
Parents: 4b48530
Author: zentol 
Authored: Fri May 19 14:39:20 2017 +0200
Committer: zentol 
Committed: Fri May 19 21:08:34 2017 +0200

--
 .../flink/streaming/runtime/io/StreamTwoInputProcessor.java | 9 +
 1 file changed, 9 insertions(+)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8ccaffe3/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
--
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
index d34686d..367b773 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.IllegalConfigurationException;
 import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.metrics.Counter;
 import org.apache.flink.metrics.Gauge;
 import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -38,6 +39,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputGate;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
+import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup;
 import org.apache.flink.runtime.metrics.groups.TaskIOMetricGroup;
 import org.apache.flink.runtime.plugable.DeserializationDelegate;
 import org.apache.flink.runtime.plugable.NonReusingDeserializationDelegate;
@@ -114,6 +116,8 @@ public class StreamTwoInputProcessor {
private long lastEmittedWatermark1;
private long lastEmittedWatermark2;
 
+   private Counter numRecordsIn;
+
private boolean isFinished;
 
@SuppressWarnings("unchecked")
@@ -195,6 +199,9 @@ public class StreamTwoInputProcessor {
if (isFinished) {
return false;
}
+   if (numRecordsIn == null) {
+   numRecordsIn = ((OperatorMetricGroup) 
streamOperator.getMetricGroup()).getIOMetricGroup().getNumRecordsInCounter();
+   }
 
while (true) {
if (currentRecordDeserializer != null) {
@@ -230,6 +237,7 @@ public class StreamTwoInputProcessor {
else {
StreamRecord 
record = recordOrWatermark.asRecord();
synchronized (lock) {
+   
numRecordsIn.inc();

streamOperator.setKeyContextElement1(record);

streamOperator.processElement1(record);
}
@@ -256,6 +264,7 @@ public class StreamTwoInputProcessor {
else {
StreamRecord 
record = recordOrWatermark.asRecord();
synchronized (lock) {
+   
numRecordsIn.inc();

streamOperator.setKeyContextElement2(record);

streamOperator.processElement2(record);
}



[4/5] flink git commit: [FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil

2017-05-19 Thread chesnay
[FLINK-6439] Fix close OutputStream && InputStream in OperatorSnapshotUtil

This closes #3904.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/65fdadac
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/65fdadac
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/65fdadac

Branch: refs/heads/master
Commit: 65fdadac805cb1efe30ff9a57605676b1b8e45b9
Parents: 17ec6f0
Author: zjureel 
Authored: Mon May 15 18:14:11 2017 +0800
Committer: zentol 
Committed: Fri May 19 21:08:34 2017 +0200

--
 .../streaming/util/OperatorSnapshotUtil.java| 162 ++-
 1 file changed, 82 insertions(+), 80 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/65fdadac/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
--
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
index 92a9452..8011279 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/util/OperatorSnapshotUtil.java
@@ -46,111 +46,113 @@ public class OperatorSnapshotUtil {
 
public static void writeStateHandle(OperatorStateHandles state, String 
path) throws IOException {
FileOutputStream out = new FileOutputStream(path);
-   DataOutputStream dos = new DataOutputStream(out);
-
-   dos.writeInt(state.getOperatorChainIndex());
-
-   
SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(),
 dos);
-
-   Collection rawOperatorState = 
state.getRawOperatorState();
-   if (rawOperatorState != null) {
-   dos.writeInt(rawOperatorState.size());
-   for (OperatorStateHandle operatorStateHandle : 
rawOperatorState) {
-   
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+   
+   try (DataOutputStream dos = new DataOutputStream(out)) {
+
+   dos.writeInt(state.getOperatorChainIndex());
+
+   
SavepointV1Serializer.serializeStreamStateHandle(state.getLegacyOperatorState(),
 dos);
+
+   Collection rawOperatorState = 
state.getRawOperatorState();
+   if (rawOperatorState != null) {
+   dos.writeInt(rawOperatorState.size());
+   for (OperatorStateHandle operatorStateHandle : 
rawOperatorState) {
+   
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+   }
+   } else {
+   // this means no states, not even an empty list
+   dos.writeInt(-1);
}
-   } else {
-   // this means no states, not even an empty list
-   dos.writeInt(-1);
-   }
 
-   Collection managedOperatorState = 
state.getManagedOperatorState();
-   if (managedOperatorState != null) {
-   dos.writeInt(managedOperatorState.size());
-   for (OperatorStateHandle operatorStateHandle : 
managedOperatorState) {
-   
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+   Collection managedOperatorState = 
state.getManagedOperatorState();
+   if (managedOperatorState != null) {
+   dos.writeInt(managedOperatorState.size());
+   for (OperatorStateHandle operatorStateHandle : 
managedOperatorState) {
+   
SavepointV1Serializer.serializeOperatorStateHandle(operatorStateHandle, dos);
+   }
+   } else {
+   // this means no states, not even an empty list
+   dos.writeInt(-1);
}
-   } else {
-   // this means no states, not even an empty list
-   dos.writeInt(-1);
-   }
 
-   Collection rawKeyedState = 
state.getRawKeyedState();
-   if (rawKeyedState != null) {
-   dos.writeInt(rawKeyedState.size());
-   for (KeyedStateHandle keyedStateHandle : rawKeyedState) 
{
-   
SavepointV1Serializ

flink git commit: [FLINK-6634] [cep] NFASerializer serializes ComputationState counter.

2017-05-19 Thread kkloudas
Repository: flink
Updated Branches:
  refs/heads/release-1.3 f6a596fe3 -> c62553c00


[FLINK-6634] [cep] NFASerializer serializes ComputationState counter.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c62553c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c62553c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c62553c0

Branch: refs/heads/release-1.3
Commit: c62553c00d68c1d17078522652fd5ffbaf6bbb90
Parents: f6a596f
Author: kkloudas 
Authored: Fri May 19 10:40:44 2017 +0200
Committer: kkloudas 
Committed: Fri May 19 16:28:56 2017 +0200

--
 .../main/java/org/apache/flink/cep/nfa/NFA.java |  7 ++-
 .../flink/cep/operator/CEPOperatorTest.java | 50 +++-
 2 files changed, 43 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/c62553c0/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index a977a7f..ff5a342 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -930,6 +930,7 @@ public class NFA implements Serializable {

timestampSerializer.serialize(computationState.getTimestamp(), target);

versionSerializer.serialize(computationState.getVersion(), target);

timestampSerializer.serialize(computationState.getStartTimestamp(), target);
+   target.writeInt(computationState.getCounter());
 
if (computationState.getEvent() == null) {
target.writeBoolean(false);
@@ -963,6 +964,7 @@ public class NFA implements Serializable {
long timestamp = 
timestampSerializer.deserialize(source);
DeweyNumber version = 
versionSerializer.deserialize(source);
long startTimestamp = 
timestampSerializer.deserialize(source);
+   int counter = source.readInt();
 
T event = null;
if (source.readBoolean()) {
@@ -970,7 +972,7 @@ public class NFA implements Serializable {
}
 

computationStates.add(ComputationState.createState(
-   nfa, state, prevState, event, 
0, timestamp, version, startTimestamp));
+   nfa, state, prevState, event, 
counter, timestamp, version, startTimestamp));
}
 
nfa.computationStates = computationStates;
@@ -1028,6 +1030,9 @@ public class NFA implements Serializable {
long startTimestamp = 
timestampSerializer.deserialize(source);
timestampSerializer.serialize(startTimestamp, 
target);
 
+   int counter = source.readInt();
+   target.writeInt(counter);
+
boolean hasEvent = source.readBoolean();
target.writeBoolean(hasEvent);
if (hasEvent) {

http://git-wip-us.apache.org/repos/asf/flink/blob/c62553c0/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
--
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 41593b0..95e3a37 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -43,6 +43,7 @@ import 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -139,6 +140,8 @@ public class CEPOperatorTest extends TestLogger {
}
 
@Test
+   @Ignore
+   // TODO: 5/19/17 Re-instate when checkpoints are fixed
public void testKeyedCEPOperatorCheckpointingWithRocksDB() throws 
Exception {
 
String rocksDbPath = temp

flink git commit: [FLINK-6634] [cep] NFASerializer serializes ComputationState counter.

2017-05-19 Thread kkloudas
Repository: flink
Updated Branches:
  refs/heads/master 8814ba767 -> 8e3213678


[FLINK-6634] [cep] NFASerializer serializes ComputationState counter.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8e321367
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8e321367
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8e321367

Branch: refs/heads/master
Commit: 8e32136783a1a3db17d06c1fd9d012dcc4e458aa
Parents: 8814ba7
Author: kkloudas 
Authored: Fri May 19 10:40:44 2017 +0200
Committer: kkloudas 
Committed: Fri May 19 16:23:33 2017 +0200

--
 .../main/java/org/apache/flink/cep/nfa/NFA.java |  7 ++-
 .../flink/cep/operator/CEPOperatorTest.java | 50 +++-
 2 files changed, 43 insertions(+), 14 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8e321367/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
--
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
index a977a7f..ff5a342 100644
--- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
+++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/nfa/NFA.java
@@ -930,6 +930,7 @@ public class NFA implements Serializable {

timestampSerializer.serialize(computationState.getTimestamp(), target);

versionSerializer.serialize(computationState.getVersion(), target);

timestampSerializer.serialize(computationState.getStartTimestamp(), target);
+   target.writeInt(computationState.getCounter());
 
if (computationState.getEvent() == null) {
target.writeBoolean(false);
@@ -963,6 +964,7 @@ public class NFA implements Serializable {
long timestamp = 
timestampSerializer.deserialize(source);
DeweyNumber version = 
versionSerializer.deserialize(source);
long startTimestamp = 
timestampSerializer.deserialize(source);
+   int counter = source.readInt();
 
T event = null;
if (source.readBoolean()) {
@@ -970,7 +972,7 @@ public class NFA implements Serializable {
}
 

computationStates.add(ComputationState.createState(
-   nfa, state, prevState, event, 
0, timestamp, version, startTimestamp));
+   nfa, state, prevState, event, 
counter, timestamp, version, startTimestamp));
}
 
nfa.computationStates = computationStates;
@@ -1028,6 +1030,9 @@ public class NFA implements Serializable {
long startTimestamp = 
timestampSerializer.deserialize(source);
timestampSerializer.serialize(startTimestamp, 
target);
 
+   int counter = source.readInt();
+   target.writeInt(counter);
+
boolean hasEvent = source.readBoolean();
target.writeBoolean(hasEvent);
if (hasEvent) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8e321367/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
--
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 41593b0..95e3a37 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -43,6 +43,7 @@ import 
org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.TestLogger;
 import org.junit.Assert;
+import org.junit.Ignore;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
@@ -139,6 +140,8 @@ public class CEPOperatorTest extends TestLogger {
}
 
@Test
+   @Ignore
+   // TODO: 5/19/17 Re-instate when checkpoints are fixed
public void testKeyedCEPOperatorCheckpointingWithRocksDB() throws 
Exception {
 
String rocksDbPath = tempFolder.new

[2/2] flink git commit: [FLINK-6606] Set UserCodeClassLoader as TCCL for MasterTriggerRestoreHook

2017-05-19 Thread trohrmann
[FLINK-6606] Set UserCodeClassLoader as TCCL for MasterTriggerRestoreHook

- wrap calls to MasterTriggerRestoreHook (and its factory) such that the user 
classloader is applied

This closes #3933.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ad08163
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ad08163
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ad08163

Branch: refs/heads/master
Commit: 2ad081636e54a2b8fd98a935a95c0949818843ad
Parents: acea4cd
Author: Wright, Eron 
Authored: Wed May 17 09:46:13 2017 -0700
Committer: Till Rohrmann 
Committed: Fri May 19 15:22:49 2017 +0200

--
 .../runtime/checkpoint/hooks/MasterHooks.java   | 108 +++
 .../executiongraph/ExecutionGraphBuilder.java   |  15 ++-
 .../checkpoint/hooks/MasterHooksTest.java   | 131 +++
 3 files changed, 251 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/2ad08163/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
index 409019e..737e816 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint.hooks;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -29,6 +30,7 @@ import org.apache.flink.util.FlinkException;
 
 import org.slf4j.Logger;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedHashMap;
@@ -267,6 +269,112 @@ public class MasterHooks {
}
 
// 

+   //  hook management
+   // 

+
+   /**
+* Wraps a hook such that the user-code classloader is applied when the 
hook is invoked.
+* @param hook the hook to wrap
+* @param userClassLoader the classloader to use
+*/
+   public static  MasterTriggerRestoreHook 
wrapHook(MasterTriggerRestoreHook hook, ClassLoader userClassLoader) {
+   return new WrappedMasterHook(hook, userClassLoader);
+   }
+
+   @VisibleForTesting
+   static class WrappedMasterHook implements 
MasterTriggerRestoreHook {
+
+   private final MasterTriggerRestoreHook hook;
+   private final ClassLoader userClassLoader;
+
+   WrappedMasterHook(MasterTriggerRestoreHook hook, ClassLoader 
userClassLoader) {
+   this.hook = hook;
+   this.userClassLoader = userClassLoader;
+   }
+
+   @Override
+   public String getIdentifier() {
+   Thread thread = Thread.currentThread();
+   ClassLoader originalClassLoader = 
thread.getContextClassLoader();
+   thread.setContextClassLoader(userClassLoader);
+   try {
+   return hook.getIdentifier();
+   }
+   finally {
+   
thread.setContextClassLoader(originalClassLoader);
+   }
+   }
+
+   @Nullable
+   @Override
+   public Future triggerCheckpoint(long checkpointId, long 
timestamp, final Executor executor) throws Exception {
+   Executor wrappedExecutor = new Executor() {
+   @Override
+   public void execute(Runnable command) {
+   executor.execute(new 
WrappedCommand(command));
+   }
+   };
+
+   Thread thread = Thread.currentThread();
+   ClassLoader originalClassLoader = 
thread.getContextClassLoader();
+   thread.setContextClassLoader(userClassLoader);
+   try {
+   return hook.triggerCheckpoint(checkpointId, 
timestamp, wrappedExecutor);
+   }
+   finally {
+   
thread.setContextClassLoader(originalClassLoader);
+   }
+   }
+
+ 

[1/2] flink git commit: [FLINK-6606] Hide WrapperMasterHook by making it private

2017-05-19 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/master acea4cde5 -> 8814ba767


[FLINK-6606] Hide WrapperMasterHook by making it private


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8814ba76
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8814ba76
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8814ba76

Branch: refs/heads/master
Commit: 8814ba767fc9af610d1c799338fa4168afc4f3fb
Parents: 2ad0816
Author: Till Rohrmann 
Authored: Fri May 19 15:06:35 2017 +0200
Committer: Till Rohrmann 
Committed: Fri May 19 15:22:49 2017 +0200

--
 .../runtime/checkpoint/hooks/MasterHooks.java   | 41 +++-
 .../executiongraph/ExecutionGraphBuilder.java   |  5 ++-
 .../checkpoint/hooks/MasterHooksTest.java   | 11 ++
 3 files changed, 29 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/8814ba76/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
index 737e816..1851eb6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint.hooks;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -28,6 +27,7 @@ import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
@@ -278,25 +278,25 @@ public class MasterHooks {
 * @param userClassLoader the classloader to use
 */
public static  MasterTriggerRestoreHook 
wrapHook(MasterTriggerRestoreHook hook, ClassLoader userClassLoader) {
-   return new WrappedMasterHook(hook, userClassLoader);
+   return new WrappedMasterHook<>(hook, userClassLoader);
}
 
-   @VisibleForTesting
-   static class WrappedMasterHook implements 
MasterTriggerRestoreHook {
+   private static class WrappedMasterHook implements 
MasterTriggerRestoreHook {
 
private final MasterTriggerRestoreHook hook;
private final ClassLoader userClassLoader;
 
WrappedMasterHook(MasterTriggerRestoreHook hook, ClassLoader 
userClassLoader) {
-   this.hook = hook;
-   this.userClassLoader = userClassLoader;
+   this.hook = Preconditions.checkNotNull(hook);
+   this.userClassLoader = 
Preconditions.checkNotNull(userClassLoader);
}
 
@Override
public String getIdentifier() {
-   Thread thread = Thread.currentThread();
-   ClassLoader originalClassLoader = 
thread.getContextClassLoader();
+   final Thread thread = Thread.currentThread();
+   final ClassLoader originalClassLoader = 
thread.getContextClassLoader();
thread.setContextClassLoader(userClassLoader);
+
try {
return hook.getIdentifier();
}
@@ -315,9 +315,10 @@ public class MasterHooks {
}
};
 
-   Thread thread = Thread.currentThread();
-   ClassLoader originalClassLoader = 
thread.getContextClassLoader();
+   final Thread thread = Thread.currentThread();
+   final ClassLoader originalClassLoader = 
thread.getContextClassLoader();
thread.setContextClassLoader(userClassLoader);
+
try {
return hook.triggerCheckpoint(checkpointId, 
timestamp, wrappedExecutor);
}
@@ -328,9 +329,10 @@ public class MasterHooks {
 
@Override
public void restoreCheckpoint(long checkpointId, @Nullable T 
checkpointData) throws Exception {
-   Thread thread = Thread.currentThread();
-   ClassLoader originalClassLoader = 
thread.getContextClassLoader();
+   final Thread thread = Thread.currentThread();
+   final ClassLoader or

[2/2] flink git commit: [FLINK-6606] Hide WrapperMasterHook by making it private

2017-05-19 Thread trohrmann
[FLINK-6606] Hide WrapperMasterHook by making it private


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6a596fe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6a596fe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6a596fe

Branch: refs/heads/release-1.3
Commit: f6a596fe3f8924602c34636959e3d7e48cb2262f
Parents: 3dba48e
Author: Till Rohrmann 
Authored: Fri May 19 15:06:35 2017 +0200
Committer: Till Rohrmann 
Committed: Fri May 19 15:21:24 2017 +0200

--
 .../runtime/checkpoint/hooks/MasterHooks.java   | 41 +++-
 .../executiongraph/ExecutionGraphBuilder.java   |  5 ++-
 .../checkpoint/hooks/MasterHooksTest.java   | 11 ++
 3 files changed, 29 insertions(+), 28 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f6a596fe/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
index 737e816..1851eb6 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.runtime.checkpoint.hooks;
 
-import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -28,6 +27,7 @@ import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 
+import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 
 import javax.annotation.Nullable;
@@ -278,25 +278,25 @@ public class MasterHooks {
 * @param userClassLoader the classloader to use
 */
public static  MasterTriggerRestoreHook 
wrapHook(MasterTriggerRestoreHook hook, ClassLoader userClassLoader) {
-   return new WrappedMasterHook(hook, userClassLoader);
+   return new WrappedMasterHook<>(hook, userClassLoader);
}
 
-   @VisibleForTesting
-   static class WrappedMasterHook implements 
MasterTriggerRestoreHook {
+   private static class WrappedMasterHook implements 
MasterTriggerRestoreHook {
 
private final MasterTriggerRestoreHook hook;
private final ClassLoader userClassLoader;
 
WrappedMasterHook(MasterTriggerRestoreHook hook, ClassLoader 
userClassLoader) {
-   this.hook = hook;
-   this.userClassLoader = userClassLoader;
+   this.hook = Preconditions.checkNotNull(hook);
+   this.userClassLoader = 
Preconditions.checkNotNull(userClassLoader);
}
 
@Override
public String getIdentifier() {
-   Thread thread = Thread.currentThread();
-   ClassLoader originalClassLoader = 
thread.getContextClassLoader();
+   final Thread thread = Thread.currentThread();
+   final ClassLoader originalClassLoader = 
thread.getContextClassLoader();
thread.setContextClassLoader(userClassLoader);
+
try {
return hook.getIdentifier();
}
@@ -315,9 +315,10 @@ public class MasterHooks {
}
};
 
-   Thread thread = Thread.currentThread();
-   ClassLoader originalClassLoader = 
thread.getContextClassLoader();
+   final Thread thread = Thread.currentThread();
+   final ClassLoader originalClassLoader = 
thread.getContextClassLoader();
thread.setContextClassLoader(userClassLoader);
+
try {
return hook.triggerCheckpoint(checkpointId, 
timestamp, wrappedExecutor);
}
@@ -328,9 +329,10 @@ public class MasterHooks {
 
@Override
public void restoreCheckpoint(long checkpointId, @Nullable T 
checkpointData) throws Exception {
-   Thread thread = Thread.currentThread();
-   ClassLoader originalClassLoader = 
thread.getContextClassLoader();
+   final Thread thread = Thread.currentThread();
+   final ClassLoader originalClassLoader = 
thread.getContextClassLoader();
   

[1/2] flink git commit: [FLINK-6606] Set UserCodeClassLoader as TCCL for MasterTriggerRestoreHook

2017-05-19 Thread trohrmann
Repository: flink
Updated Branches:
  refs/heads/release-1.3 6d178a959 -> f6a596fe3


[FLINK-6606] Set UserCodeClassLoader as TCCL for MasterTriggerRestoreHook

- wrap calls to MasterTriggerRestoreHook (and its factory) such that the user 
classloader is applied

This closes #3933.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3dba48ee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3dba48ee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3dba48ee

Branch: refs/heads/release-1.3
Commit: 3dba48ee6ad3dc6bdb9abfaa91accbb8581cfd2a
Parents: 6d178a9
Author: Wright, Eron 
Authored: Wed May 17 09:46:13 2017 -0700
Committer: Till Rohrmann 
Committed: Fri May 19 15:21:23 2017 +0200

--
 .../runtime/checkpoint/hooks/MasterHooks.java   | 108 +++
 .../executiongraph/ExecutionGraphBuilder.java   |  15 ++-
 .../checkpoint/hooks/MasterHooksTest.java   | 131 +++
 3 files changed, 251 insertions(+), 3 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/3dba48ee/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
--
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
index 409019e..737e816 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/hooks/MasterHooks.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.checkpoint.hooks;
 
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
@@ -29,6 +30,7 @@ import org.apache.flink.util.FlinkException;
 
 import org.slf4j.Logger;
 
+import javax.annotation.Nullable;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.LinkedHashMap;
@@ -267,6 +269,112 @@ public class MasterHooks {
}
 
// 

+   //  hook management
+   // 

+
+   /**
+* Wraps a hook such that the user-code classloader is applied when the 
hook is invoked.
+* @param hook the hook to wrap
+* @param userClassLoader the classloader to use
+*/
+   public static  MasterTriggerRestoreHook 
wrapHook(MasterTriggerRestoreHook hook, ClassLoader userClassLoader) {
+   return new WrappedMasterHook(hook, userClassLoader);
+   }
+
+   @VisibleForTesting
+   static class WrappedMasterHook implements 
MasterTriggerRestoreHook {
+
+   private final MasterTriggerRestoreHook hook;
+   private final ClassLoader userClassLoader;
+
+   WrappedMasterHook(MasterTriggerRestoreHook hook, ClassLoader 
userClassLoader) {
+   this.hook = hook;
+   this.userClassLoader = userClassLoader;
+   }
+
+   @Override
+   public String getIdentifier() {
+   Thread thread = Thread.currentThread();
+   ClassLoader originalClassLoader = 
thread.getContextClassLoader();
+   thread.setContextClassLoader(userClassLoader);
+   try {
+   return hook.getIdentifier();
+   }
+   finally {
+   
thread.setContextClassLoader(originalClassLoader);
+   }
+   }
+
+   @Nullable
+   @Override
+   public Future triggerCheckpoint(long checkpointId, long 
timestamp, final Executor executor) throws Exception {
+   Executor wrappedExecutor = new Executor() {
+   @Override
+   public void execute(Runnable command) {
+   executor.execute(new 
WrappedCommand(command));
+   }
+   };
+
+   Thread thread = Thread.currentThread();
+   ClassLoader originalClassLoader = 
thread.getContextClassLoader();
+   thread.setContextClassLoader(userClassLoader);
+   try {
+   return hook.triggerCheckpoint(checkpointId, 
timestamp, wrappedExecutor);
+   }
+   finally {
+   
thread.setCo

flink git commit: [FLINK-6574] [table] Support nested catalogs in ExternalCatalog.

2017-05-19 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/release-1.3 d1cff3ae3 -> 6d178a959


[FLINK-6574] [table] Support nested catalogs in ExternalCatalog.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6d178a95
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6d178a95
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6d178a95

Branch: refs/heads/release-1.3
Commit: 6d178a9597f5f9b79a0e3a6f4a61a10734188c85
Parents: d1cff3a
Author: Haohui Mai 
Authored: Mon May 15 17:09:18 2017 -0700
Committer: Fabian Hueske 
Committed: Fri May 19 14:23:11 2017 +0200

--
 .../org/apache/flink/table/api/exceptions.scala |  48 +++
 .../table/catalog/CrudExternalCatalog.scala |  78 +-
 .../flink/table/catalog/ExternalCatalog.scala   |  38 ++---
 .../table/catalog/ExternalCatalogDatabase.scala |  31 
 .../table/catalog/ExternalCatalogSchema.scala   |  91 +---
 .../table/catalog/ExternalCatalogTable.scala|  16 ---
 .../table/catalog/InMemoryExternalCatalog.scala | 142 +++
 .../flink/table/ExternalCatalogTest.scala   |  33 +
 .../catalog/InMemoryExternalCatalogTest.scala   | 103 +++---
 .../flink/table/utils/CommonTestData.scala  |  17 ++-
 10 files changed, 259 insertions(+), 338 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6d178a95/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
index 760cf75..7ea17fa 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -77,63 +77,63 @@ case class UnresolvedException(msg: String) extends 
RuntimeException(msg)
 /**
   * Exception for an operation on a nonexistent table
   *
-  * @param dbdatabase name
-  * @param table table name
-  * @param cause the cause
+  * @param catalogcatalog name
+  * @param table  table name
+  * @param cause  the cause
   */
 case class TableNotExistException(
-db: String,
+catalog: String,
 table: String,
 cause: Throwable)
-extends RuntimeException(s"Table $db.$table does not exist.", cause) {
+extends RuntimeException(s"Table $catalog.$table does not exist.", cause) {
 
-  def this(db: String, table: String) = this(db, table, null)
+  def this(catalog: String, table: String) = this(catalog, table, null)
 
 }
 
 /**
   * Exception for adding an already existent table
   *
-  * @param dbdatabase name
-  * @param table table name
-  * @param cause the cause
+  * @param catalogcatalog name
+  * @param table  table name
+  * @param cause  the cause
   */
 case class TableAlreadyExistException(
-db: String,
+catalog: String,
 table: String,
 cause: Throwable)
-extends RuntimeException(s"Table $db.$table already exists.", cause) {
+extends RuntimeException(s"Table $catalog.$table already exists.", cause) {
 
-  def this(db: String, table: String) = this(db, table, null)
+  def this(catalog: String, table: String) = this(catalog, table, null)
 
 }
 
 /**
-  * Exception for operation on a nonexistent database
+  * Exception for operation on a nonexistent catalog
   *
-  * @param db database name
+  * @param catalog catalog name
   * @param cause the cause
   */
-case class DatabaseNotExistException(
-db: String,
+case class CatalogNotExistException(
+catalog: String,
 cause: Throwable)
-extends RuntimeException(s"Database $db does not exist.", cause) {
+extends RuntimeException(s"Catalog $catalog does not exist.", cause) {
 
-  def this(db: String) = this(db, null)
+  def this(catalog: String) = this(catalog, null)
 }
 
 /**
-  * Exception for adding an already existent database
+  * Exception for adding an already existent catalog
   *
-  * @param db database name
+  * @param catalog catalog name
   * @param cause the cause
   */
-case class DatabaseAlreadyExistException(
-db: String,
+case class CatalogAlreadyExistException(
+catalog: String,
 cause: Throwable)
-extends RuntimeException(s"Database $db already exists.", cause) {
+extends RuntimeException(s"Catalog $catalog already exists.", cause) {
 
-  def this(db: String) = this(db, null)
+  def this(catalog: String) = this(catalog, null)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/6d178a95/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
---

flink git commit: [FLINK-6574] [table] Support nested catalogs in ExternalCatalog.

2017-05-19 Thread fhueske
Repository: flink
Updated Branches:
  refs/heads/master 6ae759ae5 -> acea4cde5


[FLINK-6574] [table] Support nested catalogs in ExternalCatalog.

This closes #3913.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/acea4cde
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/acea4cde
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/acea4cde

Branch: refs/heads/master
Commit: acea4cde5f0225db9e00bbef4a47fdb58419022b
Parents: 6ae759a
Author: Haohui Mai 
Authored: Mon May 15 17:09:18 2017 -0700
Committer: Fabian Hueske 
Committed: Fri May 19 14:21:19 2017 +0200

--
 .../org/apache/flink/table/api/exceptions.scala |  48 +++
 .../table/catalog/CrudExternalCatalog.scala |  78 +-
 .../flink/table/catalog/ExternalCatalog.scala   |  38 ++---
 .../table/catalog/ExternalCatalogDatabase.scala |  31 
 .../table/catalog/ExternalCatalogSchema.scala   |  91 +---
 .../table/catalog/ExternalCatalogTable.scala|  16 ---
 .../table/catalog/InMemoryExternalCatalog.scala | 142 +++
 .../flink/table/ExternalCatalogTest.scala   |  33 +
 .../catalog/InMemoryExternalCatalogTest.scala   | 103 +++---
 .../flink/table/utils/CommonTestData.scala  |  17 ++-
 10 files changed, 259 insertions(+), 338 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
--
diff --git 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
index 760cf75..7ea17fa 100644
--- 
a/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
+++ 
b/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/api/exceptions.scala
@@ -77,63 +77,63 @@ case class UnresolvedException(msg: String) extends 
RuntimeException(msg)
 /**
   * Exception for an operation on a nonexistent table
   *
-  * @param dbdatabase name
-  * @param table table name
-  * @param cause the cause
+  * @param catalogcatalog name
+  * @param table  table name
+  * @param cause  the cause
   */
 case class TableNotExistException(
-db: String,
+catalog: String,
 table: String,
 cause: Throwable)
-extends RuntimeException(s"Table $db.$table does not exist.", cause) {
+extends RuntimeException(s"Table $catalog.$table does not exist.", cause) {
 
-  def this(db: String, table: String) = this(db, table, null)
+  def this(catalog: String, table: String) = this(catalog, table, null)
 
 }
 
 /**
   * Exception for adding an already existent table
   *
-  * @param dbdatabase name
-  * @param table table name
-  * @param cause the cause
+  * @param catalogcatalog name
+  * @param table  table name
+  * @param cause  the cause
   */
 case class TableAlreadyExistException(
-db: String,
+catalog: String,
 table: String,
 cause: Throwable)
-extends RuntimeException(s"Table $db.$table already exists.", cause) {
+extends RuntimeException(s"Table $catalog.$table already exists.", cause) {
 
-  def this(db: String, table: String) = this(db, table, null)
+  def this(catalog: String, table: String) = this(catalog, table, null)
 
 }
 
 /**
-  * Exception for operation on a nonexistent database
+  * Exception for operation on a nonexistent catalog
   *
-  * @param db database name
+  * @param catalog catalog name
   * @param cause the cause
   */
-case class DatabaseNotExistException(
-db: String,
+case class CatalogNotExistException(
+catalog: String,
 cause: Throwable)
-extends RuntimeException(s"Database $db does not exist.", cause) {
+extends RuntimeException(s"Catalog $catalog does not exist.", cause) {
 
-  def this(db: String) = this(db, null)
+  def this(catalog: String) = this(catalog, null)
 }
 
 /**
-  * Exception for adding an already existent database
+  * Exception for adding an already existent catalog
   *
-  * @param db database name
+  * @param catalog catalog name
   * @param cause the cause
   */
-case class DatabaseAlreadyExistException(
-db: String,
+case class CatalogAlreadyExistException(
+catalog: String,
 cause: Throwable)
-extends RuntimeException(s"Database $db already exists.", cause) {
+extends RuntimeException(s"Catalog $catalog already exists.", cause) {
 
-  def this(db: String) = this(db, null)
+  def this(catalog: String) = this(catalog, null)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/acea4cde/flink-libraries/flink-table/src/main/scala/org/apache/flink/table/catalog/CrudExternalCatalog.scala
-

flink git commit: [FLINK-6582] [docs] Project from maven archetype is not buildable by default

2017-05-19 Thread rmetzger
Repository: flink
Updated Branches:
  refs/heads/release-1.3 f62004079 -> d1cff3ae3


[FLINK-6582] [docs] Project from maven archetype is not buildable by default

The pom.xml for flink-quickstart-java and flink-quickstart-scala must
specify scala.version and scala.binary.version.

This closes #3910


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d1cff3ae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d1cff3ae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d1cff3ae

Branch: refs/heads/release-1.3
Commit: d1cff3ae32665263e404c30d9bc21205092123db
Parents: f620040
Author: Greg Hogan 
Authored: Mon May 15 09:35:36 2017 -0400
Committer: Robert Metzger 
Committed: Fri May 19 11:45:09 2017 +0200

--
 .../main/resources/archetype-resources/pom.xml| 14 ++
 .../main/resources/archetype-resources/pom.xml|  5 +++--
 flink-quickstart/pom.xml  | 18 ++
 3 files changed, 27 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/d1cff3ae/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
--
diff --git 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index 1e525fd..b04f75f 100644
--- 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -19,7 +19,7 @@ under the License.
 http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
4.0.0
-   
+
${groupId}
${artifactId}
${version}
@@ -33,6 +33,7 @@ under the License.
1.3-SNAPSHOT
1.7.7
1.2.17
+   
@scala.binary.version@

 

@@ -48,9 +49,9 @@ under the License.



-   
+


-   

 

http://git-wip-us.apache.org/repos/asf/flink/blob/d1cff3ae/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
--
diff --git 
a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
 
b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
index 5abe496..6a5eed0 100644
--- 
a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
+++ 
b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
@@ -48,10 +48,11 @@ under the License.
1.3-SNAPSHOT
1.7.7
1.2.17
+   
@scala.binary.version@

 



http://git-wip-us.apache.org/repos/asf/flink/blob/d1cff3ae/flink-quickstart/pom.xml
--
diff --git a/flink-quickstart/pom.xml b/flink-quickstart/pom.xml
index 068bbc9..2acccd4 100644
--- a/flink-quickstart/pom.xml
+++ b/flink-quickstart/pom.xml
@@ -80,6 +80,24 @@ under the License.
true


+
+   
+   
+   org.apache.maven.plugins
+   maven-resources-plugin
+   
+   
false
+   
+   @
+   
+   
+   

+   
+   
+   src/main/resources
+   true
+   
+   

 



flink git commit: [FLINK-6582] [docs] Project from maven archetype is not buildable by default

2017-05-19 Thread rmetzger
Repository: flink
Updated Branches:
  refs/heads/master 0162543ac -> 6ae759ae5


[FLINK-6582] [docs] Project from maven archetype is not buildable by default

The pom.xml for flink-quickstart-java and flink-quickstart-scala must
specify scala.version and scala.binary.version.

This closes #3910


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6ae759ae
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6ae759ae
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6ae759ae

Branch: refs/heads/master
Commit: 6ae759ae5ece9cfa95bf49ddd44c17397903eaca
Parents: 0162543
Author: Greg Hogan 
Authored: Mon May 15 09:35:36 2017 -0400
Committer: Robert Metzger 
Committed: Fri May 19 11:43:30 2017 +0200

--
 .../main/resources/archetype-resources/pom.xml| 14 ++
 .../main/resources/archetype-resources/pom.xml|  5 +++--
 flink-quickstart/pom.xml  | 18 ++
 3 files changed, 27 insertions(+), 10 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/6ae759ae/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
--
diff --git 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
index 9991d2c..44c56b9 100644
--- 
a/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
+++ 
b/flink-quickstart/flink-quickstart-java/src/main/resources/archetype-resources/pom.xml
@@ -19,7 +19,7 @@ under the License.
 http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
4.0.0
-   
+
${groupId}
${artifactId}
${version}
@@ -33,6 +33,7 @@ under the License.
1.4-SNAPSHOT
1.7.7
1.2.17
+   
@scala.binary.version@

 

@@ -48,9 +49,9 @@ under the License.



-   
+


-   

 

http://git-wip-us.apache.org/repos/asf/flink/blob/6ae759ae/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
--
diff --git 
a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
 
b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
index 2139c6b..ff33a6e 100644
--- 
a/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
+++ 
b/flink-quickstart/flink-quickstart-scala/src/main/resources/archetype-resources/pom.xml
@@ -48,10 +48,11 @@ under the License.
1.4-SNAPSHOT
1.7.7
1.2.17
+   
@scala.binary.version@

 



http://git-wip-us.apache.org/repos/asf/flink/blob/6ae759ae/flink-quickstart/pom.xml
--
diff --git a/flink-quickstart/pom.xml b/flink-quickstart/pom.xml
index 7e1b460..ea669cd 100644
--- a/flink-quickstart/pom.xml
+++ b/flink-quickstart/pom.xml
@@ -80,6 +80,24 @@ under the License.
true


+
+   
+   
+   org.apache.maven.plugins
+   maven-resources-plugin
+   
+   
false
+   
+   @
+   
+   
+   

+   
+   
+   src/main/resources
+   true
+   
+   

 



[1/4] flink git commit: [FLINK-6612] Allow ZooKeeperStateHandleStore to lock created ZNodes

2017-05-19 Thread srichter
Repository: flink
Updated Branches:
  refs/heads/release-1.3 0963718ac -> f62004079


http://git-wip-us.apache.org/repos/asf/flink/blob/f58fec70/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
new file mode 100644
index 000..0c215cd
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLogger;
+import org.apache.zookeeper.data.Stat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for basic {@link ZooKeeperStateHandleStore} behaviour.
+ *
+ *  Tests include:
+ * 
+ * Expected usage of operations
+ * Correct ordering of ZooKeeper and state handle operations
+ * 
+ */
+public class ZooKeeperStateHandleStoreTest extends TestLogger {
+
+   private static final ZooKeeperTestEnvironment ZOOKEEPER = new 
ZooKeeperTestEnvironment(1);
+
+   @AfterClass
+   public static void tearDown() throws Exception {
+   if (ZOOKEEPER != null) {
+   ZOOKEEPER.shutdown();
+   }
+   }
+
+   @Before
+   public void cleanUp() throws Exception {
+   ZOOKEEPER.deleteAll();
+   }
+
+   /**
+* Tests add operation with lock.
+*/
+   @Test
+   public void testAddAndLock() throws Exception {
+   LongStateStorage longStateStorage = new LongStateStorage();
+   ZooKeeperStateHandleStore store = new 
ZooKeeperStateHandleStore(
+   ZOOKEEPER.getClient(), longStateStorage, 
Executors.directExecutor());
+
+   // Config
+   final String pathInZooKeeper = "/testAdd";
+   final Long state = 1239712317L;
+
+   // Test
+   store.addAndLock(pathInZooKeeper, state);
+
+   // Verify
+   // State handle created
+   assertEquals(1, store.getAllAndLock().size());
+   assertEquals(state, 
store.getAndLock(pathInZooKeeper).retrieveState());
+
+   // Path created and is persistent
+   Stat stat = 
ZOOKEEPER.getClient().checkExists().forPath(pathInZooKeeper);
+   assertNotNull(stat);
+   assertEquals(0, stat.getEphemeralOwner());
+
+   List children = 
ZOOKEEPER.getClient().getChildren().forPath(pathInZooKeeper);
+
+   // there should be one child which is the lock
+   

[3/4] flink git commit: [FLINK-6633] Register shared state before adding to CompletedCheckpointStore

2017-05-19 Thread srichter
http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
new file mode 100644
index 000..2a6975a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.checkpoint.savepoint.CheckpointTestUtils;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Random;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+public class IncrementalKeyedStateHandleTest {
+
+   /**
+* This test checks, that for an unregistered {@link 
IncrementalKeyedStateHandle} all state
+* (including shared) is discarded.
+*/
+   @Test
+   public void testUnregisteredDiscarding() throws Exception {
+   IncrementalKeyedStateHandle stateHandle = create(new 
Random(42));
+
+   stateHandle.discardState();
+
+   for (StreamStateHandle handle : 
stateHandle.getPrivateState().values()) {
+   verify(handle).discardState();
+   }
+
+   for (StreamStateHandle handle : 
stateHandle.getSharedState().values()) {
+   verify(handle).discardState();
+   }
+
+   verify(stateHandle.getMetaStateHandle()).discardState();
+   }
+
+   /**
+* This test checks, that for a registered {@link 
IncrementalKeyedStateHandle} discards respect
+* all shared state and only discard it one all references are released.
+*/
+   @Test
+   public void testSharedStateDeRegistration() throws Exception {
+
+   Random rnd = new Random(42);
+
+   SharedStateRegistry registry = spy(new SharedStateRegistry());
+
+   // Create two state handles with overlapping shared state
+   IncrementalKeyedStateHandle stateHandle1 = create(new 
Random(42));
+   IncrementalKeyedStateHandle stateHandle2 = create(new 
Random(42));
+
+   // Both handles should not be registered and not discarded by 
now.
+   for (Map.Entry entry :
+   stateHandle1.getSharedState().entrySet()) {
+
+   SharedStateRegistryKey registryKey =
+   
stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey());
+
+   verify(registry, 
times(0)).unregisterReference(registryKey);
+   verify(entry.getValue(), times(0)).discardState();
+   }
+
+   for (Map.Entry entry :
+   stateHandle2.getSharedState().entrySet()) {
+
+   SharedStateRegistryKey registryKey =
+   
stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey());
+
+   verify(registry, 
times(0)).unregisterReference(registryKey);
+   verify(entry.getValue(), times(0)).discardState();
+   }
+
+   // Now we register both ...
+   stateHandle1.registerSharedStates(registry);
+   stateHandle2.registerSharedStates(registry);
+
+   for (Map.Entry 
stateHandleEntry :
+   stateHandle1.getSharedState().entrySet()) {
+
+   SharedStateRegistryKey registryKey =
+   
stateHandle1.createSharedStateRegistryKeyFromFileName(stateHandleEntry.getKey());
+
+   verify(registry).registerReference(
+   registryKey,
+   stateHandleEntry.getValue());
+   }
+
+   for (Map.Entry 
stateHandleEntry :
+ 

[2/4] flink git commit: [FLINK-6612] Allow ZooKeeperStateHandleStore to lock created ZNodes

2017-05-19 Thread srichter
[FLINK-6612] Allow ZooKeeperStateHandleStore to lock created ZNodes

In order to guard against deletions of ZooKeeper nodes which are still being 
used
by a different ZooKeeperStateHandleStore, we have to introduce a locking 
mechanism.
Only after all ZooKeeperStateHandleStores have released their lock, the ZNode is
allowed to be deleted.

THe locking mechanism is implemented via ephemeral child nodes of the respective
ZooKeeper node. Whenever a ZooKeeperStateHandleStore wants to lock a ZNode, 
thus,
protecting it from being deleted, it creates an ephemeral child node. The node's
name is unique to the ZooKeeperStateHandleStore instance. The delete operations
will then only delete the node if it does not have any children associated.

In order to guard against oprhaned lock nodes, they are created as ephemeral 
nodes.
This means that they will be deleted by ZooKeeper once the connection of the
ZooKeeper client which created the node timed out.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f58fec70
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f58fec70
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f58fec70

Branch: refs/heads/release-1.3
Commit: f58fec70fef12056bd58b6cc2985532ccb07625e
Parents: 0963718
Author: Till Rohrmann 
Authored: Wed May 17 14:52:04 2017 +0200
Committer: Stefan Richter 
Committed: Fri May 19 11:00:07 2017 +0200

--
 .../store/ZooKeeperMesosWorkerStore.java|   8 +-
 .../ZooKeeperCompletedCheckpointStore.java  | 150 ++--
 .../ZooKeeperSubmittedJobGraphStore.java|  50 +-
 .../zookeeper/ZooKeeperStateHandleStore.java| 419 +++---
 .../CompletedCheckpointStoreTest.java   |   9 +
 ...ZooKeeperCompletedCheckpointStoreITCase.java | 133 ++-
 .../ZooKeeperCompletedCheckpointStoreTest.java  |  11 +-
 .../ZooKeeperStateHandleStoreITCase.java| 642 ---
 .../ZooKeeperStateHandleStoreTest.java  | 805 +++
 9 files changed, 1345 insertions(+), 882 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f58fec70/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
--
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
index 42abd4c..663ce56 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
@@ -88,7 +88,7 @@ public class ZooKeeperMesosWorkerStore implements 
MesosWorkerStore {
totalTaskCountInZooKeeper.close();
 
if(cleanup) {
-   
workersInZooKeeper.removeAndDiscardAllState();
+   
workersInZooKeeper.releaseAndTryRemoveAll();
}
 
isRunning = false;
@@ -169,7 +169,7 @@ public class ZooKeeperMesosWorkerStore implements 
MesosWorkerStore {
synchronized (startStopLock) {
verifyIsRunning();
 
-   List, String>> 
handles = workersInZooKeeper.getAll();
+   List, String>> 
handles = workersInZooKeeper.getAllAndLock();
 
if(handles.size() != 0) {
List workers = new 
ArrayList<>(handles.size());
@@ -199,7 +199,7 @@ public class ZooKeeperMesosWorkerStore implements 
MesosWorkerStore {
int currentVersion = workersInZooKeeper.exists(path);
if (currentVersion == -1) {
try {
-   workersInZooKeeper.add(path, worker);
+   workersInZooKeeper.addAndLock(path, 
worker);
LOG.debug("Added {} in ZooKeeper.", 
worker);
} catch (KeeperException.NodeExistsException 
ex) {
throw new 
ConcurrentModificationException("ZooKeeper unexpectedly modified", ex);
@@ -227,7 +227,7 @@ public class ZooKeeperMesosWorkerStore implements 
MesosWorkerStore {
return false;
}
 
-   workersInZooKeeper.removeAndDiscardState(path);
+   workersInZooKeeper.releaseAndTryRemove(path);
LOG.debug("Removed worker {} from ZooKeeper.", taskID

[4/4] flink git commit: [FLINK-6633] Register shared state before adding to CompletedCheckpointStore

2017-05-19 Thread srichter
[FLINK-6633] Register shared state before adding to CompletedCheckpointStore


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/f6200407
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/f6200407
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/f6200407

Branch: refs/heads/release-1.3
Commit: f6200407979fb6987f86d7029df81b345f0d9525
Parents: f58fec7
Author: Stefan Richter 
Authored: Tue May 16 12:32:05 2017 +0200
Committer: Stefan Richter 
Committed: Fri May 19 11:01:12 2017 +0200

--
 .../state/RocksDBKeyedStateBackend.java |  56 +--
 .../state/RocksDBStateBackendTest.java  |  88 -
 .../runtime/checkpoint/CompletedCheckpoint.java | 144 ++-
 .../flink/runtime/checkpoint/OperatorState.java |   7 -
 .../checkpoint/OperatorSubtaskState.java|  11 -
 .../StandaloneCompletedCheckpointStore.java |   4 +-
 .../flink/runtime/checkpoint/SubtaskState.java  |  11 -
 .../flink/runtime/checkpoint/TaskState.java |   7 -
 .../ZooKeeperCompletedCheckpointStore.java  | 149 +++-
 .../savepoint/SavepointV2Serializer.java|  17 +-
 .../runtime/state/CompositeStateHandle.java |  15 +-
 .../state/IncrementalKeyedStateHandle.java  | 171 -
 .../runtime/state/KeyGroupsStateHandle.java |   5 -
 .../state/PlaceholderStreamStateHandle.java |  44 +--
 .../runtime/state/SharedStateRegistry.java  |  54 +--
 .../state/memory/ByteStreamStateHandle.java |   7 +
 .../checkpoint/CheckpointCoordinatorTest.java   |  25 --
 .../CompletedCheckpointStoreTest.java   |  61 +--
 .../checkpoint/CompletedCheckpointTest.java |   3 -
 .../checkpoint/PendingCheckpointTest.java   |   1 -
 ...ZooKeeperCompletedCheckpointStoreITCase.java |   7 +-
 .../savepoint/CheckpointTestUtils.java  |  25 +-
 .../state/IncrementalKeyedStateHandleTest.java  | 206 ++
 .../runtime/state/SharedStateRegistryTest.java  |  14 +-
 .../runtime/state/StateBackendTestBase.java |   2 -
 .../RecoverableCompletedCheckpointStore.java|   5 +-
 ...tractEventTimeWindowCheckpointingITCase.java |   9 +-
 .../JobManagerHACheckpointRecoveryITCase.java   | 375 +--
 28 files changed, 747 insertions(+), 776 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/f6200407/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 88a759d..1f32a89 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -105,6 +105,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.UUID;
@@ -170,8 +171,8 @@ public class RocksDBKeyedStateBackend extends 
AbstractKeyedStateBackend {
/** True if incremental checkpointing is enabled */
private final boolean enableIncrementalCheckpointing;
 
-   /** The sst files materialized in pending checkpoints */
-   private final SortedMap> 
materializedSstFiles = new TreeMap<>();
+   /** The state handle ids of all sst files materialized in snapshots for 
previous checkpoints */
+   private final SortedMap> materializedSstFiles 
= new TreeMap<>();
 
/** The identifier of the last completed checkpoint */
private long lastCompletedCheckpointId = -1;
@@ -720,7 +721,7 @@ public class RocksDBKeyedStateBackend extends 
AbstractKeyedStateBackend {
private final long checkpointTimestamp;
 
/** All sst files that were part of the last previously 
completed checkpoint */
-   private Map baseSstFiles;
+   private Set baseSstFiles;
 
/** The state meta data */
private final 
List> stateMetaInfoSnapshots 
= new ArrayList<>();
@@ -732,10 +733,7 @@ public class RocksDBKeyedStateBackend extends 
AbstractKeyedStateBackend {
private final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
 
// new sst files since the last completed checkpoint
-   private final Map newSstFiles 
= new HashMap<>();
-
-   // old sst files which have been materialized in pre

[2/4] flink git commit: [FLINK-6633] Register shared state before adding to CompletedCheckpointStore

2017-05-19 Thread srichter
[FLINK-6633] Register shared state before adding to CompletedCheckpointStore


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0162543a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0162543a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0162543a

Branch: refs/heads/master
Commit: 0162543ac13f048ef67a6586d8a6e8021ec9dcd4
Parents: 3d119e1
Author: Stefan Richter 
Authored: Tue May 16 12:32:05 2017 +0200
Committer: Stefan Richter 
Committed: Fri May 19 10:57:32 2017 +0200

--
 .../state/RocksDBKeyedStateBackend.java |  56 +--
 .../state/RocksDBStateBackendTest.java  |  88 -
 .../runtime/checkpoint/CompletedCheckpoint.java | 144 ++-
 .../flink/runtime/checkpoint/OperatorState.java |   7 -
 .../checkpoint/OperatorSubtaskState.java|  11 -
 .../StandaloneCompletedCheckpointStore.java |   4 +-
 .../flink/runtime/checkpoint/SubtaskState.java  |  11 -
 .../flink/runtime/checkpoint/TaskState.java |   7 -
 .../ZooKeeperCompletedCheckpointStore.java  | 149 +++-
 .../savepoint/SavepointV2Serializer.java|  17 +-
 .../runtime/state/CompositeStateHandle.java |  15 +-
 .../state/IncrementalKeyedStateHandle.java  | 171 -
 .../runtime/state/KeyGroupsStateHandle.java |   5 -
 .../state/PlaceholderStreamStateHandle.java |  44 +--
 .../runtime/state/SharedStateRegistry.java  |  54 +--
 .../state/memory/ByteStreamStateHandle.java |   7 +
 .../checkpoint/CheckpointCoordinatorTest.java   |  25 --
 .../CompletedCheckpointStoreTest.java   |  61 +--
 .../checkpoint/CompletedCheckpointTest.java |   3 -
 .../checkpoint/PendingCheckpointTest.java   |   1 -
 ...ZooKeeperCompletedCheckpointStoreITCase.java |   7 +-
 .../savepoint/CheckpointTestUtils.java  |  25 +-
 .../state/IncrementalKeyedStateHandleTest.java  | 206 ++
 .../runtime/state/SharedStateRegistryTest.java  |  14 +-
 .../runtime/state/StateBackendTestBase.java |   2 -
 .../RecoverableCompletedCheckpointStore.java|   5 +-
 ...tractEventTimeWindowCheckpointingITCase.java |   9 +-
 .../JobManagerHACheckpointRecoveryITCase.java   | 375 +--
 28 files changed, 747 insertions(+), 776 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
--
diff --git 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
index 88a759d..1f32a89 100644
--- 
a/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
+++ 
b/flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java
@@ -105,6 +105,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
+import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.UUID;
@@ -170,8 +171,8 @@ public class RocksDBKeyedStateBackend extends 
AbstractKeyedStateBackend {
/** True if incremental checkpointing is enabled */
private final boolean enableIncrementalCheckpointing;
 
-   /** The sst files materialized in pending checkpoints */
-   private final SortedMap> 
materializedSstFiles = new TreeMap<>();
+   /** The state handle ids of all sst files materialized in snapshots for 
previous checkpoints */
+   private final SortedMap> materializedSstFiles 
= new TreeMap<>();
 
/** The identifier of the last completed checkpoint */
private long lastCompletedCheckpointId = -1;
@@ -720,7 +721,7 @@ public class RocksDBKeyedStateBackend extends 
AbstractKeyedStateBackend {
private final long checkpointTimestamp;
 
/** All sst files that were part of the last previously 
completed checkpoint */
-   private Map baseSstFiles;
+   private Set baseSstFiles;
 
/** The state meta data */
private final 
List> stateMetaInfoSnapshots 
= new ArrayList<>();
@@ -732,10 +733,7 @@ public class RocksDBKeyedStateBackend extends 
AbstractKeyedStateBackend {
private final CloseableRegistry closeableRegistry = new 
CloseableRegistry();
 
// new sst files since the last completed checkpoint
-   private final Map newSstFiles 
= new HashMap<>();
-
-   // old sst files which have been materialized in previous

[1/4] flink git commit: [FLINK-6633] Register shared state before adding to CompletedCheckpointStore

2017-05-19 Thread srichter
Repository: flink
Updated Branches:
  refs/heads/master b8f8524af -> 0162543ac


http://git-wip-us.apache.org/repos/asf/flink/blob/0162543a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
new file mode 100644
index 000..2a6975a
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
@@ -0,0 +1,206 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.state;
+
+import org.apache.flink.runtime.checkpoint.savepoint.CheckpointTestUtils;
+import org.junit.Test;
+
+import java.util.Map;
+import java.util.Random;
+
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.powermock.api.mockito.PowerMockito.spy;
+
+public class IncrementalKeyedStateHandleTest {
+
+   /**
+* This test checks, that for an unregistered {@link 
IncrementalKeyedStateHandle} all state
+* (including shared) is discarded.
+*/
+   @Test
+   public void testUnregisteredDiscarding() throws Exception {
+   IncrementalKeyedStateHandle stateHandle = create(new 
Random(42));
+
+   stateHandle.discardState();
+
+   for (StreamStateHandle handle : 
stateHandle.getPrivateState().values()) {
+   verify(handle).discardState();
+   }
+
+   for (StreamStateHandle handle : 
stateHandle.getSharedState().values()) {
+   verify(handle).discardState();
+   }
+
+   verify(stateHandle.getMetaStateHandle()).discardState();
+   }
+
+   /**
+* This test checks, that for a registered {@link 
IncrementalKeyedStateHandle} discards respect
+* all shared state and only discard it one all references are released.
+*/
+   @Test
+   public void testSharedStateDeRegistration() throws Exception {
+
+   Random rnd = new Random(42);
+
+   SharedStateRegistry registry = spy(new SharedStateRegistry());
+
+   // Create two state handles with overlapping shared state
+   IncrementalKeyedStateHandle stateHandle1 = create(new 
Random(42));
+   IncrementalKeyedStateHandle stateHandle2 = create(new 
Random(42));
+
+   // Both handles should not be registered and not discarded by 
now.
+   for (Map.Entry entry :
+   stateHandle1.getSharedState().entrySet()) {
+
+   SharedStateRegistryKey registryKey =
+   
stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey());
+
+   verify(registry, 
times(0)).unregisterReference(registryKey);
+   verify(entry.getValue(), times(0)).discardState();
+   }
+
+   for (Map.Entry entry :
+   stateHandle2.getSharedState().entrySet()) {
+
+   SharedStateRegistryKey registryKey =
+   
stateHandle1.createSharedStateRegistryKeyFromFileName(entry.getKey());
+
+   verify(registry, 
times(0)).unregisterReference(registryKey);
+   verify(entry.getValue(), times(0)).discardState();
+   }
+
+   // Now we register both ...
+   stateHandle1.registerSharedStates(registry);
+   stateHandle2.registerSharedStates(registry);
+
+   for (Map.Entry 
stateHandleEntry :
+   stateHandle1.getSharedState().entrySet()) {
+
+   SharedStateRegistryKey registryKey =
+   
stateHandle1.createSharedStateRegistryKeyFromFileName(stateHandleEntry.getKey());
+
+   verify(registry).registerReference(
+   registryKey,
+   stateHandleEntry.getVa

[3/4] flink git commit: [FLINK-6612] Allow ZooKeeperStateHandleStore to lock created ZNodes

2017-05-19 Thread srichter
http://git-wip-us.apache.org/repos/asf/flink/blob/3d119e11/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
--
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
new file mode 100644
index 000..0c215cd
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreTest.java
@@ -0,0 +1,805 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.zookeeper;
+
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
+import org.apache.flink.runtime.concurrent.Executors;
+import org.apache.flink.runtime.state.RetrievableStateHandle;
+import org.apache.flink.runtime.util.ZooKeeperUtils;
+import org.apache.flink.util.InstantiationUtil;
+import org.apache.flink.util.TestLogger;
+import org.apache.zookeeper.data.Stat;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+/**
+ * Tests for basic {@link ZooKeeperStateHandleStore} behaviour.
+ *
+ *  Tests include:
+ * 
+ * Expected usage of operations
+ * Correct ordering of ZooKeeper and state handle operations
+ * 
+ */
+public class ZooKeeperStateHandleStoreTest extends TestLogger {
+
+   private static final ZooKeeperTestEnvironment ZOOKEEPER = new 
ZooKeeperTestEnvironment(1);
+
+   @AfterClass
+   public static void tearDown() throws Exception {
+   if (ZOOKEEPER != null) {
+   ZOOKEEPER.shutdown();
+   }
+   }
+
+   @Before
+   public void cleanUp() throws Exception {
+   ZOOKEEPER.deleteAll();
+   }
+
+   /**
+* Tests add operation with lock.
+*/
+   @Test
+   public void testAddAndLock() throws Exception {
+   LongStateStorage longStateStorage = new LongStateStorage();
+   ZooKeeperStateHandleStore store = new 
ZooKeeperStateHandleStore(
+   ZOOKEEPER.getClient(), longStateStorage, 
Executors.directExecutor());
+
+   // Config
+   final String pathInZooKeeper = "/testAdd";
+   final Long state = 1239712317L;
+
+   // Test
+   store.addAndLock(pathInZooKeeper, state);
+
+   // Verify
+   // State handle created
+   assertEquals(1, store.getAllAndLock().size());
+   assertEquals(state, 
store.getAndLock(pathInZooKeeper).retrieveState());
+
+   // Path created and is persistent
+   Stat stat = 
ZOOKEEPER.getClient().checkExists().forPath(pathInZooKeeper);
+   assertNotNull(stat);
+   assertEquals(0, stat.getEphemeralOwner());
+
+   List children = 
ZOOKEEPER.getClient().getChildren().forPath(pathInZooKeeper);
+
+   // there should be one child which is the lock
+   assertEquals(1, children.size());
+
+   stat = 
ZOOKEEPER.getClien

[4/4] flink git commit: [FLINK-6612] Allow ZooKeeperStateHandleStore to lock created ZNodes

2017-05-19 Thread srichter
[FLINK-6612] Allow ZooKeeperStateHandleStore to lock created ZNodes

In order to guard against deletions of ZooKeeper nodes which are still being 
used
by a different ZooKeeperStateHandleStore, we have to introduce a locking 
mechanism.
Only after all ZooKeeperStateHandleStores have released their lock, the ZNode is
allowed to be deleted.

THe locking mechanism is implemented via ephemeral child nodes of the respective
ZooKeeper node. Whenever a ZooKeeperStateHandleStore wants to lock a ZNode, 
thus,
protecting it from being deleted, it creates an ephemeral child node. The node's
name is unique to the ZooKeeperStateHandleStore instance. The delete operations
will then only delete the node if it does not have any children associated.

In order to guard against oprhaned lock nodes, they are created as ephemeral 
nodes.
This means that they will be deleted by ZooKeeper once the connection of the
ZooKeeper client which created the node timed out.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/3d119e11
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/3d119e11
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/3d119e11

Branch: refs/heads/master
Commit: 3d119e1155aa8930cc7b18a085d6790cb2c63b70
Parents: b8f8524
Author: Till Rohrmann 
Authored: Wed May 17 14:52:04 2017 +0200
Committer: Stefan Richter 
Committed: Fri May 19 10:57:32 2017 +0200

--
 .../store/ZooKeeperMesosWorkerStore.java|   8 +-
 .../ZooKeeperCompletedCheckpointStore.java  | 150 ++--
 .../ZooKeeperSubmittedJobGraphStore.java|  50 +-
 .../zookeeper/ZooKeeperStateHandleStore.java| 419 +++---
 .../CompletedCheckpointStoreTest.java   |   9 +
 ...ZooKeeperCompletedCheckpointStoreITCase.java | 133 ++-
 .../ZooKeeperCompletedCheckpointStoreTest.java  |  11 +-
 .../ZooKeeperStateHandleStoreITCase.java| 642 ---
 .../ZooKeeperStateHandleStoreTest.java  | 805 +++
 9 files changed, 1345 insertions(+), 882 deletions(-)
--


http://git-wip-us.apache.org/repos/asf/flink/blob/3d119e11/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
--
diff --git 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
index 42abd4c..663ce56 100644
--- 
a/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
+++ 
b/flink-mesos/src/main/java/org/apache/flink/mesos/runtime/clusterframework/store/ZooKeeperMesosWorkerStore.java
@@ -88,7 +88,7 @@ public class ZooKeeperMesosWorkerStore implements 
MesosWorkerStore {
totalTaskCountInZooKeeper.close();
 
if(cleanup) {
-   
workersInZooKeeper.removeAndDiscardAllState();
+   
workersInZooKeeper.releaseAndTryRemoveAll();
}
 
isRunning = false;
@@ -169,7 +169,7 @@ public class ZooKeeperMesosWorkerStore implements 
MesosWorkerStore {
synchronized (startStopLock) {
verifyIsRunning();
 
-   List, String>> 
handles = workersInZooKeeper.getAll();
+   List, String>> 
handles = workersInZooKeeper.getAllAndLock();
 
if(handles.size() != 0) {
List workers = new 
ArrayList<>(handles.size());
@@ -199,7 +199,7 @@ public class ZooKeeperMesosWorkerStore implements 
MesosWorkerStore {
int currentVersion = workersInZooKeeper.exists(path);
if (currentVersion == -1) {
try {
-   workersInZooKeeper.add(path, worker);
+   workersInZooKeeper.addAndLock(path, 
worker);
LOG.debug("Added {} in ZooKeeper.", 
worker);
} catch (KeeperException.NodeExistsException 
ex) {
throw new 
ConcurrentModificationException("ZooKeeper unexpectedly modified", ex);
@@ -227,7 +227,7 @@ public class ZooKeeperMesosWorkerStore implements 
MesosWorkerStore {
return false;
}
 
-   workersInZooKeeper.removeAndDiscardState(path);
+   workersInZooKeeper.releaseAndTryRemove(path);
LOG.debug("Removed worker {} from ZooKeeper.", taskID);