Repository: flink Updated Branches: refs/heads/master 40d5205bc -> 33f43114d
http://git-wip-us.apache.org/repos/asf/flink/blob/33f43114/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java deleted file mode 100644 index cc2147f..0000000 --- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DBStateCheckpointingTest.java +++ /dev/null @@ -1,258 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import static org.junit.Assert.assertEquals; - -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Random; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; - -import org.apache.commons.io.FileUtils; - -import org.apache.derby.drda.NetworkServerControl; - -import org.apache.flink.api.common.functions.RichMapFunction; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.memory.MemoryStateBackend; -import org.apache.flink.streaming.api.checkpoint.Checkpointed; -import org.apache.flink.streaming.api.datastream.DataStream; -import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; -import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; -import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; -import org.apache.flink.test.checkpointing.PartitionedStateCheckpointingITCase.IdentityKeySelector; -import org.apache.flink.test.checkpointing.PartitionedStateCheckpointingITCase.NonSerializableLong; -import org.apache.flink.test.checkpointing.StreamFaultToleranceTestBase; - -import org.junit.After; -import org.junit.Before; - -@SuppressWarnings("serial") -public class DBStateCheckpointingTest extends StreamFaultToleranceTestBase { - - final long NUM_STRINGS = 1_000_000L; - final static int NUM_KEYS = 100; - private static NetworkServerControl server; - private static File tempDir; - - @Before - public void startDerbyServer() throws UnknownHostException, Exception { - server = new NetworkServerControl(InetAddress.getByName("localhost"), 1526, "flink", "flink"); - server.start(null); - tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); - // We need to ensure that the Derby server starts properly before - // beginning the tests - DbStateBackendTest.ensureServerStarted(server); - } - - @After - public void stopDerbyServer() { - try { - server.shutdown(); - FileUtils.deleteDirectory(new File(tempDir.getAbsolutePath() + "/flinkDB1")); - FileUtils.forceDelete(new File("derby.log")); - } catch (Exception ignore) { - } - } - - @Override - public void testProgram(StreamExecutionEnvironment env) { - env.enableCheckpointing(500); - - DbBackendConfig conf = new DbBackendConfig("flink", "flink", - "jdbc:derby://localhost:1526/" + tempDir.getAbsolutePath() + "/flinkDB1;create=true"); - conf.setDbAdapter(new DerbyAdapter()); - conf.setKvStateCompactionFrequency(2); - - // We store the non-partitioned states (source offset) in-memory - DbStateBackend backend = new DbStateBackend(conf, new MemoryStateBackend()); - - env.setStateBackend(backend); - - DataStream<Integer> stream1 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2)); - DataStream<Integer> stream2 = env.addSource(new IntGeneratingSourceFunction(NUM_STRINGS / 2)); - - stream1.union(stream2).keyBy(new IdentityKeySelector<Integer>()).map(new OnceFailingPartitionedSum(NUM_STRINGS)) - .keyBy(0).addSink(new CounterSink()); - } - - @Override - public void postSubmit() { - // verify that we counted exactly right - for (Entry<Integer, Long> sum : OnceFailingPartitionedSum.allSums.entrySet()) { - assertEquals(new Long(sum.getKey() * NUM_STRINGS / NUM_KEYS), sum.getValue()); - } - for (Long count : CounterSink.allCounts.values()) { - assertEquals(new Long(NUM_STRINGS / NUM_KEYS), count); - } - - assertEquals(NUM_KEYS, CounterSink.allCounts.size()); - assertEquals(NUM_KEYS, OnceFailingPartitionedSum.allSums.size()); - } - - // -------------------------------------------------------------------------------------------- - // Custom Functions - // -------------------------------------------------------------------------------------------- - - private static class IntGeneratingSourceFunction extends RichParallelSourceFunction<Integer> - implements Checkpointed<Integer> { - - private final long numElements; - - private int index; - private int step; - - private Random rnd = new Random(); - - private volatile boolean isRunning = true; - - static final long[] counts = new long[PARALLELISM]; - - @Override - public void close() throws IOException { - counts[getRuntimeContext().getIndexOfThisSubtask()] = index; - } - - IntGeneratingSourceFunction(long numElements) { - this.numElements = numElements; - } - - @Override - public void open(Configuration parameters) throws IOException { - step = getRuntimeContext().getNumberOfParallelSubtasks(); - if (index == 0) { - index = getRuntimeContext().getIndexOfThisSubtask(); - } - } - - @Override - public void run(SourceContext<Integer> ctx) throws Exception { - final Object lockingObject = ctx.getCheckpointLock(); - - while (isRunning && index < numElements) { - - synchronized (lockingObject) { - index += step; - ctx.collect(index % NUM_KEYS); - } - - if (rnd.nextDouble() < 0.008) { - Thread.sleep(1); - } - } - } - - @Override - public void cancel() { - isRunning = false; - } - - @Override - public Integer snapshotState(long checkpointId, long checkpointTimestamp) { - return index; - } - - @Override - public void restoreState(Integer state) { - index = state; - } - } - - private static class OnceFailingPartitionedSum extends RichMapFunction<Integer, Tuple2<Integer, Long>> { - - private static Map<Integer, Long> allSums = new ConcurrentHashMap<>(); - - private static volatile boolean hasFailed = false; - - private final long numElements; - - private long failurePos; - private long count; - - private ValueState<Long> sum; - - OnceFailingPartitionedSum(long numElements) { - this.numElements = numElements; - } - - @Override - public void open(Configuration parameters) throws IOException { - long failurePosMin = (long) (0.6 * numElements / getRuntimeContext().getNumberOfParallelSubtasks()); - long failurePosMax = (long) (0.8 * numElements / getRuntimeContext().getNumberOfParallelSubtasks()); - - failurePos = (new Random().nextLong() % (failurePosMax - failurePosMin)) + failurePosMin; - count = 0; - sum = getRuntimeContext().getState( - new ValueStateDescriptor<>("my_state", Long.class, 0L)); - } - - @Override - public Tuple2<Integer, Long> map(Integer value) throws Exception { - count++; - if (!hasFailed && count >= failurePos) { - hasFailed = true; - throw new Exception("Test Failure"); - } - - long currentSum = sum.value() + value; - sum.update(currentSum); - allSums.put(value, currentSum); - return new Tuple2<Integer, Long>(value, currentSum); - } - } - - private static class CounterSink extends RichSinkFunction<Tuple2<Integer, Long>> { - - private static Map<Integer, Long> allCounts = new ConcurrentHashMap<>(); - - private ValueState<NonSerializableLong> aCounts; - private ValueState<Long> bCounts; - - @Override - public void open(Configuration parameters) throws IOException { - aCounts = getRuntimeContext().getState( - new ValueStateDescriptor<>("a", NonSerializableLong.class, NonSerializableLong.of(0L))); - - bCounts = getRuntimeContext().getState(new ValueStateDescriptor<>("b", Long.class, 0L)); - } - - @Override - public void invoke(Tuple2<Integer, Long> value) throws Exception { - long ac = aCounts.value().value; - long bc = bCounts.value(); - assertEquals(ac, bc); - - long currentCount = ac + 1; - aCounts.update(NonSerializableLong.of(currentCount)); - bCounts.update(currentCount); - - allCounts.put(value.f0, currentCount); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/33f43114/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java deleted file mode 100644 index adef7db..0000000 --- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DbStateBackendTest.java +++ /dev/null @@ -1,723 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import java.io.File; -import java.io.IOException; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.List; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; - -import com.google.common.base.Joiner; -import org.apache.commons.io.FileUtils; -import org.apache.derby.drda.NetworkServerControl; -import org.apache.flink.api.common.functions.ReduceFunction; -import org.apache.flink.api.common.state.ListState; -import org.apache.flink.api.common.state.ListStateDescriptor; -import org.apache.flink.api.common.state.ReducingState; -import org.apache.flink.api.common.state.ReducingStateDescriptor; -import org.apache.flink.api.common.state.ValueState; -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.common.typeutils.base.StringSerializer; -import org.apache.flink.api.common.typeutils.base.VoidSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.ConfigConstants; -import org.apache.flink.core.testutils.CommonTestUtils; -import org.apache.flink.runtime.execution.Environment; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.runtime.state.KvState; -import org.apache.flink.runtime.state.KvStateSnapshot; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import com.google.common.collect.Lists; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.Test; - -import com.google.common.base.Optional; - -import static org.junit.Assert.*; - -public class DbStateBackendTest { - - private static NetworkServerControl server; - private static File tempDir; - private static DbBackendConfig conf; - private static String url1; - private static String url2; - - @BeforeClass - public static void startDerbyServer() throws UnknownHostException, Exception { - server = new NetworkServerControl(InetAddress.getByName("localhost"), 1527, "flink", "flink"); - - // Async call, we need to ensure that the server starts before leaving - // the method - server.start(null); - - tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); - conf = new DbBackendConfig("flink", "flink", - "jdbc:derby://localhost:1527/" + tempDir.getAbsolutePath() + "/flinkDB1;create=true"); - conf.setDbAdapter(new DerbyAdapter()); - conf.setKvStateCompactionFrequency(1); - - url1 = "jdbc:derby://localhost:1527/" + tempDir.getAbsolutePath() + "/flinkDB1;create=true"; - url2 = "jdbc:derby://localhost:1527/" + tempDir.getAbsolutePath() + "/flinkDB2;create=true"; - - // We need to ensure that the Derby server starts properly before - // beginning the tests - ensureServerStarted(server); - } - - public static void ensureServerStarted(NetworkServerControl server) throws InterruptedException { - // We try to ping the server 10 times with 1s sleep in between - // If the ping succeeds without exception the server started - int retry = 0; - while (true) { - if (retry < 10) { - try { - server.ping(); - break; - } catch (Exception e) { - retry++; - Thread.sleep(1000); - } - } else { - throw new RuntimeException("Could not start the Derby server in 10 seconds."); - } - } - } - - @AfterClass - public static void stopDerbyServer() throws Exception { - try { - server.shutdown(); - FileUtils.deleteDirectory(new File(tempDir.getAbsolutePath() + "/flinkDB1")); - FileUtils.deleteDirectory(new File(tempDir.getAbsolutePath() + "/flinkDB2")); - FileUtils.forceDelete(new File("derby.log")); - } catch (Exception ignore) { - } - } - - @Test - public void testSetupAndSerialization() throws Exception { - DbStateBackend dbBackend = new DbStateBackend(conf); - - assertFalse(dbBackend.isInitialized()); - - // serialize / copy the backend - DbStateBackend backend = CommonTestUtils.createCopySerializable(dbBackend); - assertFalse(backend.isInitialized()); - - Environment env = new DummyEnvironment("test", 1, 0); - backend.initializeForJob(env, "dummy-setup-ser", StringSerializer.INSTANCE); - String jobId = env.getJobID().toString().substring(0, 16); - - assertNotNull(backend.getConnections()); - assertTrue( - isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + jobId)); - - backend.disposeAllStateForCurrentJob(); - assertFalse( - isTableCreated(backend.getConnections().getFirst(), "checkpoints_" + jobId)); - backend.close(); - - assertTrue(backend.getConnections().getFirst().isClosed()); - } - - @Test - public void testSerializableState() throws Exception { - Environment env = new DummyEnvironment("test", 1, 0); - DbStateBackend backend = new DbStateBackend(conf); - String jobId = env.getJobID().toString().substring(0, 16); - - backend.initializeForJob(env, "dummy-ser-state", StringSerializer.INSTANCE); - - String state1 = "dummy state"; - String state2 = "row row row your boat"; - Integer state3 = 42; - - StateHandle<String> handle1 = backend.checkpointStateSerializable(state1, 439568923746L, - System.currentTimeMillis()); - StateHandle<String> handle2 = backend.checkpointStateSerializable(state2, 439568923746L, - System.currentTimeMillis()); - StateHandle<Integer> handle3 = backend.checkpointStateSerializable(state3, 439568923746L, - System.currentTimeMillis()); - - assertEquals(state1, handle1.getState(getClass().getClassLoader())); - handle1.discardState(); - - assertEquals(state2, handle2.getState(getClass().getClassLoader())); - handle2.discardState(); - - assertFalse(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + jobId)); - - assertEquals(state3, handle3.getState(getClass().getClassLoader())); - handle3.discardState(); - - assertTrue(isTableEmpty(backend.getConnections().getFirst(), "checkpoints_" + jobId)); - - backend.close(); - - } - - @Test - public void testKeyValueState() throws Exception { - - // We will create the DbStateBackend backed with a FsStateBackend for - // nonPartitioned states - File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); - try { - FsStateBackend fileBackend = new FsStateBackend(localFileUri(tempDir)); - - DbStateBackend backend = new DbStateBackend(conf, fileBackend); - - Environment env = new DummyEnvironment("test", 2, 0); - - backend.initializeForJob(env, "dummy_test_kv", IntSerializer.INSTANCE); - - ValueState<String> state = backend.createValueState(IntSerializer.INSTANCE, - new ValueStateDescriptor<>("state1", StringSerializer.INSTANCE, null)); - - LazyDbValueState<Integer, Integer, String> kv = (LazyDbValueState<Integer, Integer, String>) state; - - String tableName = "dummy_test_kv_state1"; - assertTrue(isTableCreated(backend.getConnections().getFirst(), tableName)); - - assertEquals(0, kv.size()); - - kv.setCurrentNamespace(1); - - // some modifications to the state - kv.setCurrentKey(1); - assertNull(kv.value()); - kv.update("1"); - assertEquals(1, kv.size()); - kv.setCurrentKey(2); - assertNull(kv.value()); - kv.update("2"); - assertEquals(2, kv.size()); - kv.setCurrentKey(1); - assertEquals("1", kv.value()); - assertEquals(2, kv.size()); - - kv.snapshot(682375462378L, 100); - - // make some more modifications - kv.setCurrentKey(1); - kv.update("u1"); - kv.setCurrentKey(2); - kv.update("u2"); - kv.setCurrentKey(3); - kv.update("u3"); - - // draw another snapshot - KvStateSnapshot<Integer, Integer, ValueState<String>, ValueStateDescriptor<String>, DbStateBackend> snapshot2 = kv.snapshot(682375462379L, - 200); - - // validate the original state - assertEquals(3, kv.size()); - kv.setCurrentKey(1); - assertEquals("u1", kv.value()); - kv.setCurrentKey(2); - assertEquals("u2", kv.value()); - kv.setCurrentKey(3); - assertEquals("u3", kv.value()); - - // restore the first snapshot and validate it - KvState<Integer, Integer, ValueState<String>, ValueStateDescriptor<String>, DbStateBackend> restored2 = snapshot2.restoreState( - backend, - IntSerializer.INSTANCE, - getClass().getClassLoader(), - 6823754623710L); - - restored2.setCurrentNamespace(1); - - @SuppressWarnings("unchecked") - ValueState<String> restoredState2 = (ValueState<String>) restored2; - - restored2.setCurrentKey(1); - assertEquals("u1", restoredState2.value()); - restored2.setCurrentKey(2); - assertEquals("u2", restoredState2.value()); - restored2.setCurrentKey(3); - assertEquals("u3", restoredState2.value()); - - backend.close(); - } finally { - deleteDirectorySilently(tempDir); - } - } - - @Test - @SuppressWarnings("unchecked,rawtypes") - public void testListState() { - File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); - try { - FsStateBackend fileBackend = new FsStateBackend(localFileUri(tempDir)); - - DbStateBackend backend = new DbStateBackend(conf, fileBackend); - - Environment env = new DummyEnvironment("test", 2, 0); - - backend.initializeForJob(env, "dummy_test_kv_list", IntSerializer.INSTANCE); - - ListStateDescriptor<String> kvId = new ListStateDescriptor<>("id", StringSerializer.INSTANCE); - ListState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); - - @SuppressWarnings("unchecked") - KvState<Integer, Void, ListState<String>, ListStateDescriptor<String>, DbStateBackend> kv = - (KvState<Integer, Void, ListState<String>, ListStateDescriptor<String>, DbStateBackend>) state; - - Joiner joiner = Joiner.on(","); - // some modifications to the state - kv.setCurrentKey(1); - assertEquals("", joiner.join(state.get())); - state.add("1"); - kv.setCurrentKey(2); - assertEquals("", joiner.join(state.get())); - state.add("2"); - kv.setCurrentKey(1); - assertEquals("1", joiner.join(state.get())); - - // draw a snapshot - KvStateSnapshot<Integer, Void, ListState<String>, ListStateDescriptor<String>, DbStateBackend> snapshot1 = - kv.snapshot(682375462378L, 2); - - // make some more modifications - kv.setCurrentKey(1); - state.add("u1"); - kv.setCurrentKey(2); - state.add("u2"); - kv.setCurrentKey(3); - state.add("u3"); - - // draw another snapshot - KvStateSnapshot<Integer, Void, ListState<String>, ListStateDescriptor<String>, DbStateBackend> snapshot2 = - kv.snapshot(682375462379L, 4); - - // validate the original state - kv.setCurrentKey(1); - assertEquals("1,u1", joiner.join(state.get())); - kv.setCurrentKey(2); - assertEquals("2,u2", joiner.join(state.get())); - kv.setCurrentKey(3); - assertEquals("u3", joiner.join(state.get())); - - kv.dispose(); - - // restore the second snapshot and validate it - KvState<Integer, Void, ListState<String>, ListStateDescriptor<String>, DbStateBackend> restored2 = snapshot2.restoreState( - backend, - IntSerializer.INSTANCE, - this.getClass().getClassLoader(), 20); - - @SuppressWarnings("unchecked") - ListState<String> restored2State = (ListState<String>) restored2; - - restored2.setCurrentKey(1); - assertEquals("1,u1", joiner.join(restored2State.get())); - restored2.setCurrentKey(2); - assertEquals("2,u2", joiner.join(restored2State.get())); - restored2.setCurrentKey(3); - assertEquals("u3", joiner.join(restored2State.get())); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - @SuppressWarnings("unchecked,rawtypes") - public void testReducingState() { - File tempDir = new File(ConfigConstants.DEFAULT_TASK_MANAGER_TMP_PATH, UUID.randomUUID().toString()); - try { - FsStateBackend fileBackend = new FsStateBackend(localFileUri(tempDir)); - - DbStateBackend backend = new DbStateBackend(conf, fileBackend); - - Environment env = new DummyEnvironment("test", 2, 0); - - backend.initializeForJob(env, "dummy_test_kv_reduce", IntSerializer.INSTANCE); - - ReducingStateDescriptor<String> kvId = new ReducingStateDescriptor<>("id", - new ReduceFunction<String>() { - private static final long serialVersionUID = 1L; - - @Override - public String reduce(String value1, String value2) throws Exception { - return value1 + "," + value2; - } - }, - StringSerializer.INSTANCE); - ReducingState<String> state = backend.getPartitionedState(null, VoidSerializer.INSTANCE, kvId); - - @SuppressWarnings("unchecked") - KvState<Integer, Void, ReducingState<String>, ReducingStateDescriptor<String>, DbStateBackend> kv = - (KvState<Integer, Void, ReducingState<String>, ReducingStateDescriptor<String>, DbStateBackend>) state; - - Joiner joiner = Joiner.on(","); - // some modifications to the state - kv.setCurrentKey(1); - assertEquals(null, state.get()); - state.add("1"); - kv.setCurrentKey(2); - assertEquals(null, state.get()); - state.add("2"); - kv.setCurrentKey(1); - assertEquals("1", state.get()); - - // draw a snapshot - KvStateSnapshot<Integer, Void, ReducingState<String>, ReducingStateDescriptor<String>, DbStateBackend> snapshot1 = - kv.snapshot(682375462378L, 2); - - // make some more modifications - kv.setCurrentKey(1); - state.add("u1"); - kv.setCurrentKey(2); - state.add("u2"); - kv.setCurrentKey(3); - state.add("u3"); - - // draw another snapshot - KvStateSnapshot<Integer, Void, ReducingState<String>, ReducingStateDescriptor<String>, DbStateBackend> snapshot2 = - kv.snapshot(682375462379L, 4); - - // validate the original state - kv.setCurrentKey(1); - assertEquals("1,u1", state.get()); - kv.setCurrentKey(2); - assertEquals("2,u2", state.get()); - kv.setCurrentKey(3); - assertEquals("u3", state.get()); - - kv.dispose(); - - // restore the second snapshot and validate it - KvState<Integer, Void, ReducingState<String>, ReducingStateDescriptor<String>, DbStateBackend> restored2 = snapshot2.restoreState( - backend, - IntSerializer.INSTANCE, - this.getClass().getClassLoader(), 20); - - @SuppressWarnings("unchecked") - ReducingState<String> restored2State = (ReducingState<String>) restored2; - - restored2.setCurrentKey(1); - assertEquals("1,u1", restored2State.get()); - restored2.setCurrentKey(2); - assertEquals("2,u2", restored2State.get()); - restored2.setCurrentKey(3); - assertEquals("u3", restored2State.get()); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } - - @Test - public void testCompaction() throws Exception { - DbBackendConfig conf = new DbBackendConfig("flink", "flink", url1); - MockAdapter adapter = new MockAdapter(); - conf.setKvStateCompactionFrequency(2); - conf.setDbAdapter(adapter); - - DbStateBackend backend1 = new DbStateBackend(conf); - DbStateBackend backend2 = new DbStateBackend(conf); - DbStateBackend backend3 = new DbStateBackend(conf); - - backend1.initializeForJob(new DummyEnvironment("test", 3, 0), "dummy_1", StringSerializer.INSTANCE); - backend2.initializeForJob(new DummyEnvironment("test", 3, 1), "dummy_2", StringSerializer.INSTANCE); - backend3.initializeForJob(new DummyEnvironment("test", 3, 2), "dummy_3", StringSerializer.INSTANCE); - - ValueState<String> s1State = backend1.createValueState(StringSerializer.INSTANCE, - new ValueStateDescriptor<>("a1", StringSerializer.INSTANCE, null)); - - ValueState<String> s2State = backend2.createValueState(StringSerializer.INSTANCE, - new ValueStateDescriptor<>("a2", StringSerializer.INSTANCE, null)); - - ValueState<String> s3State = backend3.createValueState(StringSerializer.INSTANCE, - new ValueStateDescriptor<>("a3", StringSerializer.INSTANCE, null)); - - LazyDbValueState<?, ?, ?> s1 = (LazyDbValueState<?, ?, ?>) s1State; - LazyDbValueState<?, ?, ?> s2 = (LazyDbValueState<?, ?, ?>) s2State; - LazyDbValueState<?, ?, ?> s3 = (LazyDbValueState<?, ?, ?>) s3State; - - assertTrue(s1.isCompactor()); - assertFalse(s2.isCompactor()); - assertFalse(s3.isCompactor()); - assertNotNull(s1.getExecutor()); - assertNull(s2.getExecutor()); - assertNull(s3.getExecutor()); - - s1.snapshot(1, 100); - s1.notifyCheckpointComplete(1); - s1.snapshot(2, 200); - s1.snapshot(3, 300); - s1.notifyCheckpointComplete(2); - s1.notifyCheckpointComplete(3); - s1.snapshot(4, 400); - s1.snapshot(5, 500); - s1.notifyCheckpointComplete(4); - s1.notifyCheckpointComplete(5); - - s1.dispose(); - s2.dispose(); - s3.dispose(); - - // Wait until the compaction completes - s1.getExecutor().awaitTermination(5, TimeUnit.SECONDS); - assertEquals(2, adapter.numCompcations.get()); - assertEquals(5, adapter.keptAlive); - - backend1.close(); - backend2.close(); - backend3.close(); - } - - @Test - public void testCaching() throws Exception { - - List<String> urls = Lists.newArrayList(url1, url2); - DbBackendConfig conf = new DbBackendConfig("flink", "flink", - urls); - - conf.setDbAdapter(new DerbyAdapter()); - conf.setKvCacheSize(3); - conf.setMaxKvInsertBatchSize(2); - - // We evict 2 elements when the cache is full - conf.setMaxKvCacheEvictFraction(0.6f); - - DbStateBackend backend = new DbStateBackend(conf); - - Environment env = new DummyEnvironment("test", 2, 0); - - String tableName = "dummy_test_caching_state1"; - assertFalse(isTableCreated(DriverManager.getConnection(url1, "flink", "flink"), tableName)); - assertFalse(isTableCreated(DriverManager.getConnection(url2, "flink", "flink"), tableName)); - - backend.initializeForJob(env, "dummy_test_caching", IntSerializer.INSTANCE); - - ValueState<String> state = backend.createValueState(IntSerializer.INSTANCE, - new ValueStateDescriptor<>("state1", StringSerializer.INSTANCE, "a")); - - LazyDbValueState<Integer, Integer, String> kv = (LazyDbValueState<Integer, Integer, String>) state; - - assertTrue(isTableCreated(DriverManager.getConnection(url1, "flink", "flink"), tableName)); - assertTrue(isTableCreated(DriverManager.getConnection(url2, "flink", "flink"), tableName)); - - Map<Tuple2<Integer, Integer>, Optional<String>> cache = kv.getStateCache(); - Map<Tuple2<Integer, Integer>, Optional<String>> modified = kv.getModified(); - - assertEquals(0, kv.size()); - - kv.setCurrentNamespace(1); - - // some modifications to the state - kv.setCurrentKey(1); - assertEquals("a", kv.value()); - - kv.update(null); - assertEquals(1, kv.size()); - kv.setCurrentKey(2); - assertEquals("a", kv.value()); - kv.update("2"); - assertEquals(2, kv.size()); - assertEquals("2", kv.value()); - - kv.setCurrentKey(1); - assertEquals("a", kv.value()); - - kv.setCurrentKey(3); - kv.update("3"); - assertEquals("3", kv.value()); - - assertTrue(modified.containsKey(Tuple2.of(1, 1))); - assertTrue(modified.containsKey(Tuple2.of(2, 1))); - assertTrue(modified.containsKey(Tuple2.of(3, 1))); - - // 1,2 should be evicted as the cache filled - kv.setCurrentKey(4); - kv.update("4"); - assertEquals("4", kv.value()); - - assertFalse(modified.containsKey(Tuple2.of(1, 1))); - assertFalse(modified.containsKey(Tuple2.of(2, 1))); - assertTrue(modified.containsKey(Tuple2.of(3, 1))); - assertTrue(modified.containsKey(Tuple2.of(4, 1))); - - assertEquals(Optional.of("3"), cache.get(Tuple2.of(3, 1))); - assertEquals(Optional.of("4"), cache.get(Tuple2.of(4, 1))); - assertFalse(cache.containsKey(Tuple2.of(1, 1))); - assertFalse(cache.containsKey(Tuple2.of(2, 1))); - - // draw a snapshot - kv.snapshot(682375462378L, 100); - - assertTrue(modified.isEmpty()); - - // make some more modifications - kv.setCurrentKey(2); - assertEquals("2", kv.value()); - kv.update(null); - assertEquals("a", kv.value()); - - assertTrue(modified.containsKey(Tuple2.of(2, 1))); - assertEquals(1, modified.size()); - - assertEquals(Optional.of("3"), cache.get(Tuple2.of(3, 1))); - assertEquals(Optional.of("4"), cache.get(Tuple2.of(4, 1))); - assertEquals(Optional.absent(), cache.get(Tuple2.of(2, 1))); - assertFalse(cache.containsKey(Tuple2.of(1, 1))); - - assertTrue(modified.containsKey(Tuple2.of(2, 1))); - assertFalse(modified.containsKey(Tuple2.of(3, 1))); - assertFalse(modified.containsKey(Tuple2.of(4, 1))); - assertTrue(cache.containsKey(Tuple2.of(3, 1))); - assertTrue(cache.containsKey(Tuple2.of(4, 1))); - - // clear cache from initial keys - - kv.setCurrentKey(5); - kv.value(); - kv.setCurrentKey(6); - kv.value(); - kv.setCurrentKey(7); - kv.value(); - - assertFalse(modified.containsKey(Tuple2.of(5, 1))); - assertTrue(modified.containsKey(Tuple2.of(6, 1))); - assertTrue(modified.containsKey(Tuple2.of(7, 1))); - - assertFalse(cache.containsKey(Tuple2.of(1, 1))); - assertFalse(cache.containsKey(Tuple2.of(2, 1))); - assertFalse(cache.containsKey(Tuple2.of(3, 1))); - assertFalse(cache.containsKey(Tuple2.of(4, 1))); - - kv.setCurrentKey(2); - assertEquals("a", kv.value()); - - long checkpointTs = System.currentTimeMillis(); - - // Draw a snapshot that we will restore later - KvStateSnapshot<Integer, Integer, ValueState<String>, ValueStateDescriptor<String>, DbStateBackend> snapshot1 = kv.snapshot(682375462379L, checkpointTs); - - assertTrue(modified.isEmpty()); - - // Do some updates then draw another snapshot (imitate a partial - // failure), these updates should not be visible if we restore snapshot1 - kv.setCurrentKey(1); - kv.update("123"); - kv.setCurrentKey(3); - kv.update("456"); - kv.setCurrentKey(2); - kv.notifyCheckpointComplete(682375462379L); - kv.update("2"); - kv.setCurrentKey(4); - kv.update("4"); - kv.update("5"); - - kv.snapshot(6823754623710L, checkpointTs + 10); - - // restore the second snapshot and validate it (we set a new default - // value here to make sure that the default wasn't written) - KvState<Integer, Integer, ValueState<String>, ValueStateDescriptor<String>, DbStateBackend> restored = snapshot1.restoreState( - backend, - IntSerializer.INSTANCE, - getClass().getClassLoader(), - 6823754623711L); - - LazyDbValueState<Integer, Integer, String> lazyRestored = (LazyDbValueState<Integer, Integer, String>) restored; - - cache = lazyRestored.getStateCache(); - modified = lazyRestored.getModified(); - - restored.setCurrentNamespace(1); - - @SuppressWarnings("unchecked") - ValueState<String> restoredState = (ValueState<String>) restored; - - restored.setCurrentKey(1); - - assertEquals("a", restoredState.value()); - // make sure that we got the default and not some value from the db - assertEquals(cache.get(Tuple2.of(1, 1)), Optional.<String>absent()); - restored.setCurrentKey(2); - assertEquals("a", restoredState.value()); - // make sure that we got the default and not some value from the db - assertEquals(cache.get(Tuple2.of(2, 1)), Optional.<String>absent()); - restored.setCurrentKey(3); - assertEquals("3", restoredState.value()); - restored.setCurrentKey(4); - assertEquals("4", restoredState.value()); - - backend.close(); - } - - private static boolean isTableCreated(Connection con, String tableName) throws SQLException { - return con.getMetaData().getTables(null, null, tableName.toUpperCase(), null).next(); - } - - private static boolean isTableEmpty(Connection con, String tableName) throws SQLException { - try (Statement smt = con.createStatement()) { - ResultSet res = smt.executeQuery("select * from " + tableName); - return !res.next(); - } - } - - private static String localFileUri(File path) { - return path.toURI().toString(); - } - - private static void deleteDirectorySilently(File dir) { - try { - FileUtils.deleteDirectory(dir); - } catch (IOException ignored) { - } - } - - private static class MockAdapter extends DerbyAdapter { - - private static final long serialVersionUID = 1L; - public AtomicInteger numCompcations = new AtomicInteger(0); - public int keptAlive = 0; - - @Override - public void compactKvStates(String kvStateId, Connection con, long lowerTs, long upperTs) throws SQLException { - numCompcations.incrementAndGet(); - } - - @Override - public void keepAlive(Connection con) throws SQLException { - keptAlive++; - } - } - -} http://git-wip-us.apache.org/repos/asf/flink/blob/33f43114/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java ---------------------------------------------------------------------- diff --git a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java b/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java deleted file mode 100644 index 5331956..0000000 --- a/flink-contrib/flink-streaming-contrib/src/test/java/org/apache/flink/contrib/streaming/state/DerbyAdapter.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.contrib.streaming.state; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Types; -import java.util.List; -import java.util.concurrent.Callable; - -import org.apache.flink.api.java.tuple.Tuple2; - -/** - * Adapter for the Derby JDBC driver which has slightly restricted CREATE TABLE - * and SELECT semantics compared to the default assumptions. - * - */ -public class DerbyAdapter extends MySqlAdapter { - - private static final long serialVersionUID = 1L; - - /** - * We need to override this method as Derby does not support the - * "IF NOT EXISTS" clause at table creation - */ - @Override - public void createCheckpointsTable(String jobId, Connection con) throws SQLException { - - try (Statement smt = con.createStatement()) { - smt.executeUpdate( - "CREATE TABLE checkpoints_" + jobId - + " (" - + "checkpointId bigint, " - + "timestamp bigint, " - + "handleId bigint," - + "checkpoint blob," - + "PRIMARY KEY (handleId)" - + ")"); - } catch (SQLException se) { - if (se.getSQLState().equals("X0Y32")) { - // table already created, ignore - } else { - throw se; - } - } - } - - /** - * We need to override this method as Derby does not support the - * "IF NOT EXISTS" clause at table creation - */ - @Override - public void createKVStateTable(String stateId, Connection con) throws SQLException { - validateStateId(stateId); - try (Statement smt = con.createStatement()) { - smt.executeUpdate( - "CREATE TABLE " + stateId - + " (" - + "timestamp bigint, " - + "k varchar(256) for bit data, " - + "v blob, " - + "PRIMARY KEY (k, timestamp)" - + ")"); - } catch (SQLException se) { - if (se.getSQLState().equals("X0Y32")) { - // table already created, ignore - } else { - throw se; - } - } - } - - /** - * We need to override this method as Derby does not support "LIMIT n" for - * select statements. - */ - @Override - public String prepareKeyLookup(String stateId) throws SQLException { - validateStateId(stateId); - return "SELECT v " + "FROM " + stateId - + " WHERE k = ? " - + " AND timestamp <= ?" - + " ORDER BY timestamp DESC"; - } - - @Override - public void compactKvStates(String stateId, Connection con, long lowerBound, long upperBound) - throws SQLException { - validateStateId(stateId); - - try (Statement smt = con.createStatement()) { - smt.executeUpdate("DELETE FROM " + stateId + " t1" - + " WHERE EXISTS" - + " (" - + " SELECT * FROM " + stateId + " t2" - + " WHERE t2.k = t1.k" - + " AND t2.timestamp > t1.timestamp" - + " AND t2.timestamp <=" + upperBound - + " AND t2.timestamp >= " + lowerBound - + " )"); - } - } - - @Override - public String prepareKVCheckpointInsert(String stateId) throws SQLException { - validateStateId(stateId); - return "INSERT INTO " + stateId + " (timestamp, k, v) VALUES (?,?,?)"; - } - - @Override - public void insertBatch(final String stateId, final DbBackendConfig conf, - final Connection con, final PreparedStatement insertStatement, final long checkpointTs, - final List<Tuple2<byte[], byte[]>> toInsert) throws IOException { - - SQLRetrier.retry(new Callable<Void>() { - public Void call() throws Exception { - con.setAutoCommit(false); - for (Tuple2<byte[], byte[]> kv : toInsert) { - setKVInsertParams(stateId, insertStatement, checkpointTs, kv.f0, kv.f1); - insertStatement.addBatch(); - } - insertStatement.executeBatch(); - con.commit(); - con.setAutoCommit(true); - insertStatement.clearBatch(); - return null; - } - }, new Callable<Void>() { - public Void call() throws Exception { - con.rollback(); - insertStatement.clearBatch(); - return null; - } - }, conf.getMaxNumberOfSqlRetries(), conf.getSleepBetweenSqlRetries()); - } - - private void setKVInsertParams(String stateId, PreparedStatement insertStatement, long checkpointId, - byte[] key, byte[] value) throws SQLException { - insertStatement.setLong(1, checkpointId); - insertStatement.setBytes(2, key); - if (value != null) { - insertStatement.setBytes(3, value); - } else { - insertStatement.setNull(3, Types.BLOB); - } - } -}