Repository: flink Updated Branches: refs/heads/release-1.3 0225db288 -> 09caa9ffd
http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java index 77423c2..dc2b11e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreITCase.java @@ -18,13 +18,14 @@ package org.apache.flink.runtime.checkpoint; -import org.apache.curator.framework.CuratorFramework; import org.apache.flink.runtime.concurrent.Executors; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.runtime.state.SharedStateRegistry; import org.apache.flink.runtime.zookeeper.RetrievableStateStorageHelper; import org.apache.flink.runtime.zookeeper.ZooKeeperTestEnvironment; + +import org.apache.curator.framework.CuratorFramework; import org.apache.zookeeper.data.Stat; import org.junit.AfterClass; import org.junit.Before; @@ -106,8 +107,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints()); // Recover - sharedStateRegistry.clear(); - checkpoints.recover(sharedStateRegistry); + sharedStateRegistry.close(); + sharedStateRegistry = new SharedStateRegistry(); + checkpoints.recover(); assertEquals(3, ZOOKEEPER.getClient().getChildren().forPath(CHECKPOINT_PATH).size()); assertEquals(3, checkpoints.getNumberOfRetainedCheckpoints()); @@ -148,8 +150,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint assertEquals(0, store.getNumberOfRetainedCheckpoints()); assertNull(client.checkExists().forPath(CHECKPOINT_PATH + ZooKeeperCompletedCheckpointStore.checkpointIdToPath(checkpoint.getCheckpointID()))); - sharedStateRegistry.clear(); - store.recover(sharedStateRegistry); + sharedStateRegistry.close(); + store.recover(); assertEquals(0, store.getNumberOfRetainedCheckpoints()); } @@ -182,8 +184,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint assertEquals("The checkpoint node should not be locked.", 0, stat.getNumChildren()); // Recover again - sharedStateRegistry.clear(); - store.recover(sharedStateRegistry); + sharedStateRegistry.close(); + store.recover(); CompletedCheckpoint recovered = store.getLatestCheckpoint(); assertEquals(checkpoint, recovered); @@ -209,8 +211,8 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint checkpointStore.addCheckpoint(checkpoint); } - sharedStateRegistry.clear(); - checkpointStore.recover(sharedStateRegistry); + sharedStateRegistry.close(); + checkpointStore.recover(); CompletedCheckpoint latestCheckpoint = checkpointStore.getLatestCheckpoint(); @@ -239,8 +241,9 @@ public class ZooKeeperCompletedCheckpointStoreITCase extends CompletedCheckpoint zkCheckpointStore1.addCheckpoint(completedCheckpoint); // recover the checkpoint by a different checkpoint store - sharedStateRegistry.clear(); - zkCheckpointStore2.recover(sharedStateRegistry); + sharedStateRegistry.close(); + sharedStateRegistry = new SharedStateRegistry(); + zkCheckpointStore2.recover(); CompletedCheckpoint recoveredCheckpoint = zkCheckpointStore2.getLatestCheckpoint(); assertTrue(recoveredCheckpoint instanceof TestCompletedCheckpoint); http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java index 91bab85..3171f1f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/ZooKeeperCompletedCheckpointStoreTest.java @@ -52,7 +52,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyCollection; import static org.mockito.Matchers.anyString; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; @@ -162,11 +161,7 @@ public class ZooKeeperCompletedCheckpointStoreTest extends TestLogger { stateStorage, Executors.directExecutor()); - SharedStateRegistry sharedStateRegistry = spy(new SharedStateRegistry()); - zooKeeperCompletedCheckpointStore.recover(sharedStateRegistry); - - verify(retrievableStateHandle1.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry); - verify(retrievableStateHandle2.retrieveState(), times(1)).registerSharedStatesAfterRestored(sharedStateRegistry); + zooKeeperCompletedCheckpointStore.recover(); CompletedCheckpoint latestCompletedCheckpoint = zooKeeperCompletedCheckpointStore.getLatestCheckpoint(); http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java index c1b3ccd..9f6f88e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/IncrementalKeyedStateHandleTest.java @@ -19,12 +19,15 @@ package org.apache.flink.runtime.state; import org.apache.flink.runtime.checkpoint.savepoint.CheckpointTestUtils; + import org.junit.Test; import java.util.Map; import java.util.Random; import java.util.UUID; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.powermock.api.mockito.PowerMockito.spy; @@ -59,8 +62,6 @@ public class IncrementalKeyedStateHandleTest { @Test public void testSharedStateDeRegistration() throws Exception { - Random rnd = new Random(42); - SharedStateRegistry registry = spy(new SharedStateRegistry()); // Create two state handles with overlapping shared state @@ -186,6 +187,76 @@ public class IncrementalKeyedStateHandleTest { verify(stateHandle2.getMetaStateHandle(), times(1)).discardState(); } + /** + * This tests that re-registration of shared state with another registry works as expected. This simulates a + * recovery from a checkpoint, when the checkpoint coordinator creates a new shared state registry and re-registers + * all live checkpoint states. + */ + @Test + public void testSharedStateReRegistration() throws Exception { + + SharedStateRegistry stateRegistryA = spy(new SharedStateRegistry()); + + IncrementalKeyedStateHandle stateHandleX = create(new Random(1)); + IncrementalKeyedStateHandle stateHandleY = create(new Random(2)); + IncrementalKeyedStateHandle stateHandleZ = create(new Random(3)); + + // Now we register first time ... + stateHandleX.registerSharedStates(stateRegistryA); + stateHandleY.registerSharedStates(stateRegistryA); + stateHandleZ.registerSharedStates(stateRegistryA); + + try { + // Second attempt should fail + stateHandleX.registerSharedStates(stateRegistryA); + fail("Should not be able to register twice with the same registry."); + } catch (IllegalStateException ignore) { + } + + // Everything should be discarded for this handle + stateHandleZ.discardState(); + verify(stateHandleZ.getMetaStateHandle(), times(1)).discardState(); + for (StreamStateHandle stateHandle : stateHandleZ.getSharedState().values()) { + verify(stateHandle, times(1)).discardState(); + } + + // Close the first registry + stateRegistryA.close(); + + // Attempt to register to closed registry should trigger exception + try { + create(new Random(4)).registerSharedStates(stateRegistryA); + fail("Should not be able to register new state to closed registry."); + } catch (IllegalStateException ignore) { + } + + // All state should still get discarded + stateHandleY.discardState(); + verify(stateHandleY.getMetaStateHandle(), times(1)).discardState(); + for (StreamStateHandle stateHandle : stateHandleY.getSharedState().values()) { + verify(stateHandle, times(1)).discardState(); + } + + // This should still be unaffected + verify(stateHandleX.getMetaStateHandle(), never()).discardState(); + for (StreamStateHandle stateHandle : stateHandleX.getSharedState().values()) { + verify(stateHandle, never()).discardState(); + } + + // We re-register the handle with a new registry + SharedStateRegistry sharedStateRegistryB = spy(new SharedStateRegistry()); + stateHandleX.registerSharedStates(sharedStateRegistryB); + stateHandleX.discardState(); + + // Should be completely discarded because it is tracked through the new registry + verify(stateHandleX.getMetaStateHandle(), times(1)).discardState(); + for (StreamStateHandle stateHandle : stateHandleX.getSharedState().values()) { + verify(stateHandle, times(1)).discardState(); + } + + sharedStateRegistryB.close(); + } + private static IncrementalKeyedStateHandle create(Random rnd) { return new IncrementalKeyedStateHandle( UUID.nameUUIDFromBytes("test".getBytes()), http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java index a0c4412..037ecd1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/RecoverableCompletedCheckpointStore.java @@ -21,7 +21,8 @@ package org.apache.flink.runtime.testutils; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.CompletedCheckpointStore; import org.apache.flink.runtime.jobgraph.JobStatus; -import org.apache.flink.runtime.state.SharedStateRegistry; +import org.apache.flink.util.Preconditions; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,14 +42,21 @@ public class RecoverableCompletedCheckpointStore implements CompletedCheckpointS private final ArrayDeque<CompletedCheckpoint> suspended = new ArrayDeque<>(2); + private final int maxRetainedCheckpoints; + + public RecoverableCompletedCheckpointStore() { + this(1); + } + + public RecoverableCompletedCheckpointStore(int maxRetainedCheckpoints) { + Preconditions.checkArgument(maxRetainedCheckpoints > 0); + this.maxRetainedCheckpoints = maxRetainedCheckpoints; + } + @Override - public void recover(SharedStateRegistry sharedStateRegistry) throws Exception { + public void recover() throws Exception { checkpoints.addAll(suspended); suspended.clear(); - - for (CompletedCheckpoint checkpoint : checkpoints) { - checkpoint.registerSharedStatesAfterRestored(sharedStateRegistry); - } } @Override @@ -56,13 +64,16 @@ public class RecoverableCompletedCheckpointStore implements CompletedCheckpointS checkpoints.addLast(checkpoint); - - if (checkpoints.size() > 1) { - CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst(); - checkpointToSubsume.discardOnSubsume(); + if (checkpoints.size() > maxRetainedCheckpoints) { + removeOldestCheckpoint(); } } + public void removeOldestCheckpoint() throws Exception { + CompletedCheckpoint checkpointToSubsume = checkpoints.removeFirst(); + checkpointToSubsume.discardOnSubsume(); + } + @Override public CompletedCheckpoint getLatestCheckpoint() throws Exception { return checkpoints.isEmpty() ? null : checkpoints.getLast(); @@ -96,7 +107,7 @@ public class RecoverableCompletedCheckpointStore implements CompletedCheckpointS @Override public int getMaxNumberOfRetainedCheckpoints() { - return 1; + return maxRetainedCheckpoints; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 3ce5a14..fb5f0e7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -837,7 +837,6 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> // Utilities // ------------------------------------------------------------------------ - @Override public String toString() { return getName(); http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java index 6ad7708..7c38d8d 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AbstractEventTimeWindowCheckpointingITCase.java @@ -27,9 +27,12 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.HighAvailabilityOptions; import org.apache.flink.configuration.TaskManagerOptions; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServicesUtils; import org.apache.flink.runtime.minicluster.LocalFlinkMiniCluster; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.CheckpointListener; @@ -47,19 +50,25 @@ import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.test.util.SuccessException; import org.apache.flink.util.Collector; import org.apache.flink.util.TestLogger; -import org.junit.AfterClass; + +import org.apache.curator.test.TestingServer; +import org.junit.After; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Rule; import org.junit.Test; import org.junit.rules.TemporaryFolder; +import java.io.File; import java.io.IOException; import java.util.Collections; import java.util.HashMap; import java.util.List; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.apache.flink.test.checkpointing.AbstractEventTimeWindowCheckpointingITCase.StateBackendEnum.ROCKSDB_INCREMENTAL_ZK; import static org.apache.flink.test.util.TestUtils.tryExecute; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; @@ -84,22 +93,35 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog private static TestStreamEnvironment env; + private static TestingServer zkServer; + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); private StateBackendEnum stateBackendEnum; private AbstractStateBackend stateBackend; - AbstractEventTimeWindowCheckpointingITCase(StateBackendEnum stateBackendEnum) { + AbstractEventTimeWindowCheckpointingITCase(StateBackendEnum stateBackendEnum) throws IOException { this.stateBackendEnum = stateBackendEnum; } enum StateBackendEnum { - MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, MEM_ASYNC, FILE_ASYNC + MEM, FILE, ROCKSDB_FULLY_ASYNC, ROCKSDB_INCREMENTAL, ROCKSDB_INCREMENTAL_ZK, MEM_ASYNC, FILE_ASYNC } - @BeforeClass - public static void startTestCluster() { + @Before + public void startTestCluster() throws Exception { + + // Testing HA Scenario / ZKCompletedCheckpointStore with incremental checkpoints + if (ROCKSDB_INCREMENTAL_ZK.equals(stateBackendEnum)) { + zkServer = new TestingServer(); + zkServer.start(); + } + + TemporaryFolder temporaryFolder = new TemporaryFolder(); + temporaryFolder.create(); + final File haDir = temporaryFolder.newFolder(); + Configuration config = new Configuration(); config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, 2); config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM / 2); @@ -107,21 +129,28 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog // the default network buffers size (10% of heap max =~ 150MB) seems to much for this test case config.setLong(TaskManagerOptions.NETWORK_BUFFERS_MEMORY_MAX, 80L << 20); // 80 MB - cluster = new LocalFlinkMiniCluster(config, false); + if (zkServer != null) { + config.setString(HighAvailabilityOptions.HA_MODE, "ZOOKEEPER"); + config.setString(HighAvailabilityOptions.HA_ZOOKEEPER_QUORUM, zkServer.getConnectString()); + config.setString(HighAvailabilityOptions.HA_STORAGE_PATH, haDir.toURI().toString()); + } + + // purposefully delay in the executor to tease out races + final ScheduledExecutorService executor = Executors.newScheduledThreadPool(10); + HighAvailabilityServices haServices = HighAvailabilityServicesUtils.createAvailableOrEmbeddedServices( + config, + new Executor() { + @Override + public void execute(Runnable command) { + executor.schedule(command, 500, MILLISECONDS); + } + }); + + cluster = new LocalFlinkMiniCluster(config, haServices, false); cluster.start(); env = new TestStreamEnvironment(cluster, PARALLELISM); - } - - @AfterClass - public static void stopTestCluster() { - if (cluster != null) { - cluster.stop(); - } - } - @Before - public void initStateBackend() throws IOException { switch (stateBackendEnum) { case MEM: this.stateBackend = new MemoryStateBackend(MAX_MEM_STATE_SIZE, false); @@ -146,7 +175,8 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog this.stateBackend = rdb; break; } - case ROCKSDB_INCREMENTAL: { + case ROCKSDB_INCREMENTAL: + case ROCKSDB_INCREMENTAL_ZK: { String rocksDb = tempFolder.newFolder().getAbsolutePath(); String backups = tempFolder.newFolder().getAbsolutePath(); // we use the fs backend with small threshold here to test the behaviour with file @@ -160,7 +190,21 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog this.stateBackend = rdb; break; } + default: + throw new IllegalStateException("No backend selected."); + } + } + + @After + public void stopTestCluster() throws IOException { + if (cluster != null) { + cluster.stop(); + cluster = null; + } + if (zkServer != null) { + zkServer.stop(); + zkServer = null; } } @@ -172,7 +216,7 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog final int WINDOW_SIZE = windowSize(); final int NUM_KEYS = numKeys(); FailingSource.reset(); - + try { env.setParallelism(PARALLELISM); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); @@ -506,7 +550,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog } } - // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ @@ -667,7 +710,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog assertEquals("Window start: " + value.f1 + " end: " + value.f2, expectedSum, value.f3.value); - Integer curr = windowCounts.get(value.f0); if (curr != null) { windowCounts.put(value.f0, curr + 1); @@ -754,7 +796,6 @@ public abstract class AbstractEventTimeWindowCheckpointingITCase extends TestLog windowCounts.put(value.f0, 1); } - // verify the contents of that window, the contents should be: // (key + num windows so far) http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java index a5bf10c..9abbddd 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncFileBackendEventTimeWindowCheckpointingITCase.java @@ -18,9 +18,11 @@ package org.apache.flink.test.checkpointing; +import java.io.IOException; + public class AsyncFileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase { - public AsyncFileBackendEventTimeWindowCheckpointingITCase() { + public AsyncFileBackendEventTimeWindowCheckpointingITCase() throws IOException { super(StateBackendEnum.FILE_ASYNC); } } http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java index ef9ad37..62041a5 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/AsyncMemBackendEventTimeWindowCheckpointingITCase.java @@ -18,9 +18,11 @@ package org.apache.flink.test.checkpointing; +import java.io.IOException; + public class AsyncMemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase { - public AsyncMemBackendEventTimeWindowCheckpointingITCase() { + public AsyncMemBackendEventTimeWindowCheckpointingITCase() throws IOException { super(StateBackendEnum.MEM_ASYNC); } } http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java index 65fda09..3111f05 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/FileBackendEventTimeWindowCheckpointingITCase.java @@ -18,9 +18,11 @@ package org.apache.flink.test.checkpointing; +import java.io.IOException; + public class FileBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase { - public FileBackendEventTimeWindowCheckpointingITCase() { + public FileBackendEventTimeWindowCheckpointingITCase() throws IOException { super(StateBackendEnum.FILE); } } http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java new file mode 100644 index 0000000..8e23909 --- /dev/null +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.test.checkpointing; + +import java.io.IOException; + +/** + * Integration tests for incremental RocksDB backend. + */ +public class HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase { + + public HAIncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() throws IOException { + super(StateBackendEnum.ROCKSDB_INCREMENTAL_ZK); + } + + @Override + protected int numElementsPerKey() { + return 3000; + } + + @Override + protected int windowSize() { + return 1000; + } + + @Override + protected int windowSlide() { + return 100; + } + + @Override + protected int numKeys() { + return 100; + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java index 352f9f7..2cdfbe7 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase.java @@ -18,9 +18,11 @@ package org.apache.flink.test.checkpointing; +import java.io.IOException; + public class IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase { - public IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() { + public IncrementalRocksDbBackendEventTimeWindowCheckpointingITCase() throws IOException { super(StateBackendEnum.ROCKSDB_INCREMENTAL); } http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java index 899b8d6..701b746 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/MemBackendEventTimeWindowCheckpointingITCase.java @@ -18,9 +18,11 @@ package org.apache.flink.test.checkpointing; +import java.io.IOException; + public class MemBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase { - public MemBackendEventTimeWindowCheckpointingITCase() { + public MemBackendEventTimeWindowCheckpointingITCase() throws IOException { super(StateBackendEnum.MEM); } } http://git-wip-us.apache.org/repos/asf/flink/blob/09caa9ff/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java index da2bbc7..b7cbaa9 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/RocksDbBackendEventTimeWindowCheckpointingITCase.java @@ -18,9 +18,11 @@ package org.apache.flink.test.checkpointing; +import java.io.IOException; + public class RocksDbBackendEventTimeWindowCheckpointingITCase extends AbstractEventTimeWindowCheckpointingITCase { - public RocksDbBackendEventTimeWindowCheckpointingITCase() { + public RocksDbBackendEventTimeWindowCheckpointingITCase() throws IOException { super(StateBackendEnum.ROCKSDB_FULLY_ASYNC); }