Repository: flink
Updated Branches:
  refs/heads/release-1.3 0225db288 -> 09caa9ffd


http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
index 77423c2..dc2b11e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java
@@ -18,13 +18,14 @@
 
 package org.apache.flink.runtime.checkpoint;
 
-import org.apache.curator.framework.CuratorFramework;
 import org.apache.flink.runtime.concurrent.Executors;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.state.RetrievableStateHandle;
 import org.apache.flink.runtime.state.SharedStateRegistry;
 import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper;
 import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment;
+
+import org.apache.curator.framework.CuratorFramework;
 import org.apache.zookeeper.data.Stat;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -106,8 +107,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
 
                // Recover
-               sharedStateRegistry.clear();
-               checkpoints.recover(sharedStateRegistry);
+               sharedStateRegistry.close();
+               sharedStateRegistry = new SharedStateRegistry();
+               checkpoints.recover();
 
                assertEquals(3, 
ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size());
                assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints());
@@ -148,8 +150,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                assertEquals(0, store.getNumberOfRetainedCheckpoints());
                assertNull(client.checkExists().forPath(CHECKPOINT_PATH + 
ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID())));
 
-               sharedStateRegistry.clear();
-               store.recover(sharedStateRegistry);
+               sharedStateRegistry.close();
+               store.recover();
 
                assertEquals(0, store.getNumberOfRetainedCheckpoints());
        }
@@ -182,8 +184,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                assertEquals("The checkpoint node should not be locked.", 0, 
stat.getNumChildren());
 
                // Recover again
-               sharedStateRegistry.clear();
-               store.recover(sharedStateRegistry);
+               sharedStateRegistry.close();
+               store.recover();
 
                CompletedCheckpoint recovered = store.getLatestCheckpoint();
                assertEquals(checkpoint, recovered);
@@ -209,8 +211,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                        checkpointStore.addCheckpoint(checkpoint);
                }
 
-               sharedStateRegistry.clear();
-               checkpointStore.recover(sharedStateRegistry);
+               sharedStateRegistry.close();
+               checkpointStore.recover();
 
                CompletedCheckpoint latestCheckpoint = 
checkpointStore.getLatestCheckpoint();
 
@@ -239,8 +241,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase 
extends CompletedCheckpoint
                zkCheckpointStore1.addCheckpoint(completedCheckpoint);
 
                // recover the checkpoint by a different checkpoint store
-               sharedStateRegistry.clear();
-               zkCheckpointStore2.recover(sharedStateRegistry);
+               sharedStateRegistry.close();
+               sharedStateRegistry = new SharedStateRegistry();
+               zkCheckpointStore2.recover();
 
                CompletedCheckpoint recoveredCheckpoint = 
zkCheckpointStore2.getLatestCheckpoint();
                assertTrue(recoveredCheckpoint instanceof 
TestCompletedCheckpoint);

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
index 91bab85..3171f1f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java
@@ -52,7 +52,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyCollection;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
@@ -162,11 +161,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends 
TestLogger {
                        stateStorage,
                        Executors.directExecutor());
 
-               SharedStateRegistry sharedStateRegistry = spy(new 
SharedStateRegistry());
-               zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry);
-
-               verify(retrievableStateHandle1.retrieveState(), 
times(1)).registerSharedStatesAfterRestored(sharedStateRegistry);
-               verify(retrievableStateHandle2.retrieveState(), 
times(1)).registerSharedStatesAfterRestored(sharedStateRegistry);
+               zooKeeperCompletedCheckpointStore.recover();
 
                CompletedCheckpoint latestCompletedCheckpoint = 
zooKeeperCompletedCheckpointStore.getLatestCheckpoint();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
index c1b3ccd..9f6f88e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java
@@ -19,12 +19,15 @@
 package org.apache.flink.runtime.state;
 
 import org.apache.flink.runtime.checkpoint.savepoint.CheckpointTestUtils;
+
 import org.junit.Test;
 
 import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.powermock.api.mockito.PowerMockito.spy;
@@ -59,8 +62,6 @@ public class IncrementalKeyedStateHandleTest {
        @Test
        public void testSharedStateDeRegistration() throws Exception {
 
-               Random rnd = new Random(42);
-
                SharedStateRegistry registry = spy(new SharedStateRegistry());
 
                // Create two state handles with overlapping shared state
@@ -186,6 +187,76 @@ public class IncrementalKeyedStateHandleTest {
                verify(stateHandle2.getMetaStateHandle(), 
times(1)).discardState();
        }
 
+       /**
+        * This tests that re-registration of shared state with another 
registry works as expected. This simulates a
+        * recovery from a checkpoint, when the checkpoint coordinator creates 
a new shared state registry and re-registers
+        * all live checkpoint states.
+        */
+       @Test
+       public void testSharedStateReRegistration() throws Exception {
+
+               SharedStateRegistry stateRegistryA = spy(new 
SharedStateRegistry());
+
+               IncrementalKeyedStateHandle stateHandleX = create(new 
Random(1));
+               IncrementalKeyedStateHandle stateHandleY = create(new 
Random(2));
+               IncrementalKeyedStateHandle stateHandleZ = create(new 
Random(3));
+
+               // Now we register first time ...
+               stateHandleX.registerSharedStates(stateRegistryA);
+               stateHandleY.registerSharedStates(stateRegistryA);
+               stateHandleZ.registerSharedStates(stateRegistryA);
+
+               try {
+                       // Second attempt should fail
+                       stateHandleX.registerSharedStates(stateRegistryA);
+                       fail("Should not be able to register twice with the 
same registry.");
+               } catch (IllegalStateException ignore) {
+               }
+
+               // Everything should be discarded for this handle
+               stateHandleZ.discardState();
+               verify(stateHandleZ.getMetaStateHandle(), 
times(1)).discardState();
+               for (StreamStateHandle stateHandle : 
stateHandleZ.getSharedState().values()) {
+                       verify(stateHandle, times(1)).discardState();
+               }
+
+               // Close the first registry
+               stateRegistryA.close();
+
+               // Attempt to register to closed registry should trigger 
exception
+               try {
+                       create(new 
Random(4)).registerSharedStates(stateRegistryA);
+                       fail("Should not be able to register new state to 
closed registry.");
+               } catch (IllegalStateException ignore) {
+               }
+
+               // All state should still get discarded
+               stateHandleY.discardState();
+               verify(stateHandleY.getMetaStateHandle(), 
times(1)).discardState();
+               for (StreamStateHandle stateHandle : 
stateHandleY.getSharedState().values()) {
+                       verify(stateHandle, times(1)).discardState();
+               }
+
+               // This should still be unaffected
+               verify(stateHandleX.getMetaStateHandle(), 
never()).discardState();
+               for (StreamStateHandle stateHandle : 
stateHandleX.getSharedState().values()) {
+                       verify(stateHandle, never()).discardState();
+               }
+
+               // We re-register the handle with a new registry
+               SharedStateRegistry sharedStateRegistryB = spy(new 
SharedStateRegistry());
+               stateHandleX.registerSharedStates(sharedStateRegistryB);
+               stateHandleX.discardState();
+
+               // Should be completely discarded because it is tracked through 
the new registry
+               verify(stateHandleX.getMetaStateHandle(), 
times(1)).discardState();
+               for (StreamStateHandle stateHandle : 
stateHandleX.getSharedState().values()) {
+                       verify(stateHandle, times(1)).discardState();
+               }
+
+               sharedStateRegistryB.close();
+       }
+
        private static IncrementalKeyedStateHandle create(Random rnd) {
                return new IncrementalKeyedStateHandle(
                        UUID.nameUUIDFromBytes("test".getBytes()),

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
index a0c4412..037ecd1 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java
@@ -21,7 +21,8 @@ package org.apache.flink.runtime.testutils;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore;
 import org.apache.flink.runtime.jobgraph.JobStatus;
-import org.apache.flink.runtime.state.SharedStateRegistry;
+import org.apache.flink.util.Preconditions;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,14 +42,21 @@ public class RecoverableCompletedCheckpointStore implements 
CompletedCheckpointS
 
        private final ArrayDeque<CompletedCheckpoint> suspended = new 
ArrayDeque<>(2);
 
+       private final int maxRetainedCheckpoints;
+
+       public RecoverableCompletedCheckpointStore() {
+               this(1);
+       }
+
+       public RecoverableCompletedCheckpointStore(int maxRetainedCheckpoints) {
+               Preconditions.checkArgument(maxRetainedCheckpoints > 0);
+               this.maxRetainedCheckpoints = maxRetainedCheckpoints;
+       }
+
        @Override
-       public void recover(SharedStateRegistry sharedStateRegistry) throws 
Exception {
+       public void recover() throws Exception {
                checkpoints.addAll(suspended);
                suspended.clear();
-
-               for (CompletedCheckpoint checkpoint : checkpoints) {
-                       
checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry);
-               }
        }
 
        @Override
@@ -56,13 +64,16 @@ public class RecoverableCompletedCheckpointStore implements 
CompletedCheckpointS
 
                checkpoints.addLast(checkpoint);
 
-
-               if (checkpoints.size() > 1) {
-                       CompletedCheckpoint checkpointToSubsume = 
checkpoints.removeFirst();
-                       checkpointToSubsume.discardOnSubsume();
+               if (checkpoints.size() > maxRetainedCheckpoints) {
+                       removeOldestCheckpoint();
                }
        }
 
+       public void removeOldestCheckpoint() throws Exception {
+               CompletedCheckpoint checkpointToSubsume = 
checkpoints.removeFirst();
+               checkpointToSubsume.discardOnSubsume();
+       }
+
        @Override
        public CompletedCheckpoint getLatestCheckpoint() throws Exception {
                return checkpoints.isEmpty() ? null : checkpoints.getLast();
@@ -96,7 +107,7 @@ public class RecoverableCompletedCheckpointStore implements 
CompletedCheckpointS
 
        @Override
        public int getMaxNumberOfRetainedCheckpoints() {
-               return 1;
+               return maxRetainedCheckpoints;
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 3ce5a14..fb5f0e7 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -837,7 +837,6 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
        //  Utilities
        // 
------------------------------------------------------------------------
 
-
        @Override
        public String toString() {
                return getName();

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
index 6ad7708..7c38d8d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java
@@ -27,9 +27,12 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.TaskManagerOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
 import org.apache.flink.core.fs.Path;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils;
 import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster;
 import org.apache.flink.runtime.state.AbstractStateBackend;
 import org.apache.flink.runtime.state.CheckpointListener;
@@ -47,19 +50,25 @@ import 
org.apache.flink.streaming.util.TestStreamEnvironment;
 import org.apache.flink.test.util.SuccessException;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.TestLogger;
-import org.junit.AfterClass;
+
+import org.apache.curator.test.TestingServer;
+import org.junit.After;
 import org.junit.Before;
-import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import static 
org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK;
 import static org.apache.flink.test.util.TestUtils.tryExecute;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
@@ -84,22 +93,35 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
        private static TestStreamEnvironment env;
 
+       private static TestingServer zkServer;
+
        @Rule
        public TemporaryFolder tempFolder = new TemporaryFolder();
 
        private StateBackendEnum stateBackendEnum;
        private AbstractStateBackend stateBackend;
 
-       AbstractEventTimeWindowCheckpointingITCase(StateBackendEnum 
stateBackendEnum) {
+       AbstractEventTimeWindowCheckpointingITCase(StateBackendEnum 
stateBackendEnum) throws IOException {
                this.stateBackendEnum = stateBackendEnum;
        }
 
        enum StateBackendEnum {
-               MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, MEM_ASYNC, 
FILE_ASYNC
+               MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, 
ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC
        }
 
-       @BeforeClass
-       public static void startTestCluster() {
+       @Before
+       public void startTestCluster() throws Exception {
+
+               // Testing HA Scenario / ZKCompletedCheckpointStore with 
incremental checkpoints
+               if (ROCKSDB_INCREMENTAL_ZK.equals(stateBackendEnum)) {
+                       zkServer = new TestingServer();
+                       zkServer.start();
+               }
+
+               TemporaryFolder temporaryFolder = new TemporaryFolder();
+               temporaryFolder.create();
+               final File haDir = temporaryFolder.newFolder();
+
                Configuration config = new Configuration();
                config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2);
                config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, 
PARALLELISM / 2);
@@ -107,21 +129,28 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                // the default network buffers size (10% of heap max =~ 150MB) 
seems to much for this test case
                config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 
80L << 20); // 80 MB
 
-               cluster = new LocalFlinkMiniCluster(config, false);
+               if (zkServer != null) {
+                       config.setString(HighAvailabilityOptions.HA_MODE, 
"ZOOKEEPER");
+                       
config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, 
zkServer.getConnectString());
+                       
config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, 
haDir.toURI().toString());
+               }
+
+               // purposefully delay in the executor to tease out races
+               final ScheduledExecutorService executor = 
Executors.newScheduledThreadPool(10);
+               HighAvailabilityServices haServices = 
HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices(
+                       config,
+                       new Executor() {
+                               @Override
+                               public void execute(Runnable command) {
+                                       executor.schedule(command, 500, 
MILLISECONDS);
+                               }
+                       });
+
+               cluster = new LocalFlinkMiniCluster(config, haServices, false);
                cluster.start();
 
                env = new TestStreamEnvironment(cluster, PARALLELISM);
-       }
-
-       @AfterClass
-       public static void stopTestCluster() {
-               if (cluster != null) {
-                       cluster.stop();
-               }
-       }
 
-       @Before
-       public void initStateBackend() throws IOException {
                switch (stateBackendEnum) {
                        case MEM:
                                this.stateBackend = new 
MemoryStateBackend(MAX_MEM_STATE_SIZE, false);
@@ -146,7 +175,8 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                                this.stateBackend = rdb;
                                break;
                        }
-                       case ROCKSDB_INCREMENTAL: {
+                       case ROCKSDB_INCREMENTAL:
+                       case ROCKSDB_INCREMENTAL_ZK: {
                                String rocksDb = 
tempFolder.newFolder().getAbsolutePath();
                                String backups = 
tempFolder.newFolder().getAbsolutePath();
                                // we use the fs backend with small threshold 
here to test the behaviour with file
@@ -160,7 +190,21 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                                this.stateBackend = rdb;
                                break;
                        }
+                       default:
+                               throw new IllegalStateException("No backend 
selected.");
+               }
+       }
+
+       @After
+       public void stopTestCluster() throws IOException {
+               if (cluster != null) {
+                       cluster.stop();
+                       cluster = null;
+               }
 
+               if (zkServer != null) {
+                       zkServer.stop();
+                       zkServer = null;
                }
        }
 
@@ -172,7 +216,7 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                final int WINDOW_SIZE = windowSize();
                final int NUM_KEYS = numKeys();
                FailingSource.reset();
-               
+
                try {
                        env.setParallelism(PARALLELISM);
                        
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
@@ -506,7 +550,6 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                }
        }
 
-
        // 
------------------------------------------------------------------------
        //  Utilities
        // 
------------------------------------------------------------------------
@@ -667,7 +710,6 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
 
                        assertEquals("Window start: " + value.f1 + " end: " + 
value.f2, expectedSum, value.f3.value);
 
-
                        Integer curr = windowCounts.get(value.f0);
                        if (curr != null) {
                                windowCounts.put(value.f0, curr + 1);
@@ -754,7 +796,6 @@ public abstract class 
AbstractEventTimeWindowCheckpointingITCase extends TestLog
                                windowCounts.put(value.f0, 1);
                        }
 
-
                        // verify the contents of that window, the contents 
should be:
                        // (key + num windows so far)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
index a5bf10c..9abbddd 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.test.checkpointing;
 
+import java.io.IOException;
+
 public class AsyncFileBackendEventTimeWindowCheckpointingITCase extends 
AbstractEventTimeWindowCheckpointingITCase {
 
-       public AsyncFileBackendEventTimeWindowCheckpointingITCase() {
+       public AsyncFileBackendEventTimeWindowCheckpointingITCase() throws 
IOException {
                super(StateBackendEnum.FILE_ASYNC);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
index ef9ad37..62041a5 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.test.checkpointing;
 
+import java.io.IOException;
+
 public class AsyncMemBackendEventTimeWindowCheckpointingITCase extends 
AbstractEventTimeWindowCheckpointingITCase {
 
-       public AsyncMemBackendEventTimeWindowCheckpointingITCase() {
+       public AsyncMemBackendEventTimeWindowCheckpointingITCase() throws 
IOException {
                super(StateBackendEnum.MEM_ASYNC);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
index 65fda09..3111f05 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.test.checkpointing;
 
+import java.io.IOException;
+
 public class FileBackendEventTimeWindowCheckpointingITCase extends 
AbstractEventTimeWindowCheckpointingITCase {
 
-       public FileBackendEventTimeWindowCheckpointingITCase() {
+       public FileBackendEventTimeWindowCheckpointingITCase() throws 
IOException {
                super(StateBackendEnum.FILE);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
new file mode 100644
index 0000000..8e23909
--- /dev/null
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.checkpointing;
+
+import java.io.IOException;
+
+/**
+ * Integration tests for incremental RocksDB backend.
+ */
+public class HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase 
extends AbstractEventTimeWindowCheckpointingITCase {
+
+       public HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() 
throws IOException {
+               super(StateBackendEnum.ROCKSDB_INCREMENTAL_ZK);
+       }
+
+       @Override
+       protected int numElementsPerKey() {
+               return 3000;
+       }
+
+       @Override
+       protected int windowSize() {
+               return 1000;
+       }
+
+       @Override
+       protected int windowSlide() {
+               return 100;
+       }
+
+       @Override
+       protected int numKeys() {
+               return 100;
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
index 352f9f7..2cdfbe7 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.test.checkpointing;
 
+import java.io.IOException;
+
 public class IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase 
extends AbstractEventTimeWindowCheckpointingITCase {
 
-       public IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() {
+       public IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() 
throws IOException {
                super(StateBackendEnum.ROCKSDB_INCREMENTAL);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
index 899b8d6..701b746 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.test.checkpointing;
 
+import java.io.IOException;
+
 public class MemBackendEventTimeWindowCheckpointingITCase extends 
AbstractEventTimeWindowCheckpointingITCase {
 
-       public MemBackendEventTimeWindowCheckpointingITCase() {
+       public MemBackendEventTimeWindowCheckpointingITCase() throws 
IOException {
                super(StateBackendEnum.MEM);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
index da2bbc7..b7cbaa9 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java
@@ -18,9 +18,11 @@
 
 package org.apache.flink.test.checkpointing;
 
+import java.io.IOException;
+
 public class RocksDbBackendEventTimeWindowCheckpointingITCase extends 
AbstractEventTimeWindowCheckpointingITCase {
 
-       public RocksDbBackendEventTimeWindowCheckpointingITCase() {
+       public RocksDbBackendEventTimeWindowCheckpointingITCase() throws 
IOException {
                super(StateBackendEnum.ROCKSDB_FULLY_ASYNC);
        }
 

Reply via email to