http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java deleted file mode 100644 index 5964b72..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/FsCheckpointStateOutputStreamTest.java +++ /dev/null @@ -1,128 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.state; - -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.apache.flink.runtime.state.filesystem.FileStreamStateHandle; -import org.apache.flink.runtime.state.filesystem.FsStateBackend; -import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; - -import org.junit.Test; - -import java.io.File; -import java.io.InputStream; -import java.util.Random; - -import static org.junit.Assert.*; - -public class FsCheckpointStateOutputStreamTest { - - /** The temp dir, obtained in a platform neutral way */ - private static final Path TEMP_DIR_PATH = new Path(new File(System.getProperty("java.io.tmpdir")).toURI()); - - - @Test(expected = IllegalArgumentException.class) - public void testWrongParameters() { - // this should fail - new FsStateBackend.FsCheckpointStateOutputStream( - TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 4000, 5000); - } - - - @Test - public void testEmptyState() throws Exception { - AbstractStateBackend.CheckpointStateOutputStream stream = new FsStateBackend.FsCheckpointStateOutputStream( - TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 1024, 512); - - StreamStateHandle handle = stream.closeAndGetHandle(); - assertTrue(handle instanceof ByteStreamStateHandle); - - InputStream inStream = handle.getState(ClassLoader.getSystemClassLoader()); - assertEquals(-1, inStream.read()); - } - - @Test - public void testStateBlowMemThreshold() throws Exception { - runTest(222, 999, 512, false); - } - - @Test - public void testStateOneBufferAboveThreshold() throws Exception { - runTest(896, 1024, 15, true); - } - - @Test - public void testStateAboveMemThreshold() throws Exception { - runTest(576446, 259, 17, true); - } - - @Test - public void testZeroThreshold() throws Exception { - runTest(16678, 4096, 0, true); - } - - private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile) throws Exception { - AbstractStateBackend.CheckpointStateOutputStream stream = - new FsStateBackend.FsCheckpointStateOutputStream( - TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), bufferSize, threshold); - - Random rnd = new Random(); - byte[] original = new byte[numBytes]; - byte[] bytes = new byte[original.length]; - - rnd.nextBytes(original); - System.arraycopy(original, 0, bytes, 0, original.length); - - // the test writes a mixture of writing individual bytes and byte arrays - int pos = 0; - while (pos < bytes.length) { - boolean single = rnd.nextBoolean(); - if (single) { - stream.write(bytes[pos++]); - } - else { - int num = rnd.nextInt(Math.min(10, bytes.length - pos)); - stream.write(bytes, pos, num); - pos += num; - } - } - - StreamStateHandle handle = stream.closeAndGetHandle(); - if (expectFile) { - assertTrue(handle instanceof FileStreamStateHandle); - } else { - assertTrue(handle instanceof ByteStreamStateHandle); - } - - // make sure the writing process did not alter the original byte array - assertArrayEquals(original, bytes); - - InputStream inStream = handle.getState(ClassLoader.getSystemClassLoader()); - byte[] validation = new byte[bytes.length]; - int bytesRead = inStream.read(validation); - - assertEquals(numBytes, bytesRead); - assertEquals(-1, inStream.read()); - - assertArrayEquals(bytes, validation); - - handle.discardState(); - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupRangeOffsetTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupRangeOffsetTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupRangeOffsetTest.java new file mode 100644 index 0000000..95564cc --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupRangeOffsetTest.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.junit.Assert; +import org.junit.Test; + +import java.util.Arrays; + +public class KeyGroupRangeOffsetTest { + + @Test + public void testKeyGroupIntersection() { + long[] offsets = new long[9]; + for (int i = 0; i < offsets.length; ++i) { + offsets[i] = i; + } + + int startKeyGroup = 2; + + KeyGroupRangeOffsets keyGroupRangeOffsets = new KeyGroupRangeOffsets(KeyGroupRange.of(startKeyGroup, 10), offsets); + KeyGroupRangeOffsets intersection = keyGroupRangeOffsets.getIntersection(KeyGroupRange.of(3, 7)); + KeyGroupRangeOffsets expected = new KeyGroupRangeOffsets( + KeyGroupRange.of(3, 7), Arrays.copyOfRange(offsets, 3 - startKeyGroup, 8 - startKeyGroup)); + Assert.assertEquals(expected, intersection); + + Assert.assertEquals(keyGroupRangeOffsets, keyGroupRangeOffsets.getIntersection( + keyGroupRangeOffsets.getKeyGroupRange())); + + intersection = keyGroupRangeOffsets.getIntersection(KeyGroupRange.of(11, 13)); + Assert.assertEquals(KeyGroupRange.EMPTY_KEY_GROUP, intersection.getKeyGroupRange()); + Assert.assertFalse(intersection.iterator().hasNext()); + + intersection = keyGroupRangeOffsets.getIntersection(KeyGroupRange.of(5, 13)); + expected = new KeyGroupRangeOffsets(KeyGroupRange.of(5, 10), Arrays.copyOfRange( + offsets, 5 - startKeyGroup, 11 - startKeyGroup)); + Assert.assertEquals(expected, intersection); + + intersection = keyGroupRangeOffsets.getIntersection(KeyGroupRange.of(0, 2)); + expected = new KeyGroupRangeOffsets(KeyGroupRange.of(2, 2), Arrays.copyOfRange( + offsets, 2 - startKeyGroup, 3 - startKeyGroup)); + Assert.assertEquals(intersection, intersection); + } + + @Test + public void testKeyGroupRangeOffsetsBasics() { + testKeyGroupRangeOffsetsBasicsInternal(0, 0); + testKeyGroupRangeOffsetsBasicsInternal(0, 1); + testKeyGroupRangeOffsetsBasicsInternal(1, 2); + testKeyGroupRangeOffsetsBasicsInternal(42, 42); + testKeyGroupRangeOffsetsBasicsInternal(3, 7); + testKeyGroupRangeOffsetsBasicsInternal(0, Short.MAX_VALUE); + testKeyGroupRangeOffsetsBasicsInternal(Short.MAX_VALUE - 1, Short.MAX_VALUE); + + try { + testKeyGroupRangeOffsetsBasicsInternal(-3, 2); + Assert.fail(); + } catch (IllegalArgumentException ex) { + //expected + } + + KeyGroupRangeOffsets testNoGivenOffsets = new KeyGroupRangeOffsets(3, 7); + for (int i = 3; i <= 7; ++i) { + testNoGivenOffsets.setKeyGroupOffset(i, i + 1); + } + for (int i = 3; i <= 7; ++i) { + Assert.assertEquals(i + 1, testNoGivenOffsets.getKeyGroupOffset(i)); + } + + } + + private void testKeyGroupRangeOffsetsBasicsInternal(int startKeyGroup, int endKeyGroup) { + + long[] offsets = new long[endKeyGroup - startKeyGroup + 1]; + for (int i = 0; i < offsets.length; ++i) { + offsets[i] = i; + } + + KeyGroupRangeOffsets keyGroupRange = new KeyGroupRangeOffsets(startKeyGroup, endKeyGroup, offsets); + KeyGroupRangeOffsets sameButDifferentConstr = + new KeyGroupRangeOffsets(KeyGroupRange.of(startKeyGroup, endKeyGroup), offsets); + Assert.assertEquals(keyGroupRange, sameButDifferentConstr); + + int numberOfKeyGroup = keyGroupRange.getKeyGroupRange().getNumberOfKeyGroups(); + Assert.assertEquals(Math.max(0, endKeyGroup - startKeyGroup + 1), numberOfKeyGroup); + if (numberOfKeyGroup > 0) { + Assert.assertEquals(startKeyGroup, keyGroupRange.getKeyGroupRange().getStartKeyGroup()); + Assert.assertEquals(endKeyGroup, keyGroupRange.getKeyGroupRange().getEndKeyGroup()); + int c = startKeyGroup; + for (Tuple2<Integer, Long> tuple : keyGroupRange) { + Assert.assertEquals(c, (int) tuple.f0); + Assert.assertTrue(keyGroupRange.getKeyGroupRange().contains(tuple.f0)); + Assert.assertEquals((long) c - startKeyGroup, (long) tuple.f1); + ++c; + } + + for (int i = startKeyGroup; i <= endKeyGroup; ++i) { + Assert.assertEquals(i - startKeyGroup, keyGroupRange.getKeyGroupOffset(i)); + } + + int newOffset = 42; + for (int i = startKeyGroup; i <= endKeyGroup; ++i) { + keyGroupRange.setKeyGroupOffset(i, newOffset); + ++newOffset; + } + + for (int i = startKeyGroup; i <= endKeyGroup; ++i) { + Assert.assertEquals(42 + i - startKeyGroup, keyGroupRange.getKeyGroupOffset(i)); + } + + Assert.assertEquals(endKeyGroup + 1, c); + Assert.assertFalse(keyGroupRange.getKeyGroupRange().contains(startKeyGroup - 1)); + Assert.assertFalse(keyGroupRange.getKeyGroupRange().contains(endKeyGroup + 1)); + } else { + Assert.assertEquals(KeyGroupRange.EMPTY_KEY_GROUP, keyGroupRange); + } + } + +} http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupRangeTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupRangeTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupRangeTest.java new file mode 100644 index 0000000..ab0c327 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/KeyGroupRangeTest.java @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state; + +import org.junit.Assert; +import org.junit.Test; + +public class KeyGroupRangeTest { + + @Test + public void testKeyGroupIntersection() { + KeyGroupRange keyGroupRange1 = KeyGroupRange.of(0, 10); + KeyGroupRange keyGroupRange2 = KeyGroupRange.of(3, 7); + KeyGroupRange intersection = keyGroupRange1.getIntersection(keyGroupRange2); + Assert.assertEquals(3, intersection.getStartKeyGroup()); + Assert.assertEquals(7, intersection.getEndKeyGroup()); + Assert.assertEquals(intersection, keyGroupRange2.getIntersection(keyGroupRange1)); + + Assert.assertEquals(keyGroupRange1, keyGroupRange1.getIntersection(keyGroupRange1)); + + keyGroupRange1 = KeyGroupRange.of(0,5); + keyGroupRange2 = KeyGroupRange.of(6,10); + intersection =keyGroupRange1.getIntersection(keyGroupRange2); + Assert.assertEquals(KeyGroupRange.EMPTY_KEY_GROUP, intersection); + Assert.assertEquals(intersection, keyGroupRange2.getIntersection(keyGroupRange1)); + + keyGroupRange1 = KeyGroupRange.of(0, 10); + keyGroupRange2 = KeyGroupRange.of(5, 20); + intersection = keyGroupRange1.getIntersection(keyGroupRange2); + Assert.assertEquals(5, intersection.getStartKeyGroup()); + Assert.assertEquals(10, intersection.getEndKeyGroup()); + Assert.assertEquals(intersection, keyGroupRange2.getIntersection(keyGroupRange1)); + + keyGroupRange1 = KeyGroupRange.of(3, 12); + keyGroupRange2 = KeyGroupRange.of(0, 10); + intersection = keyGroupRange1.getIntersection(keyGroupRange2); + Assert.assertEquals(3, intersection.getStartKeyGroup()); + Assert.assertEquals(10, intersection.getEndKeyGroup()); + Assert.assertEquals(intersection, keyGroupRange2.getIntersection(keyGroupRange1)); + } + + + @Test + public void testKeyGroupRangeBasics() { + testKeyGroupRangeBasicsInternal(0, 0); + testKeyGroupRangeBasicsInternal(0, 1); + testKeyGroupRangeBasicsInternal(1, 2); + testKeyGroupRangeBasicsInternal(42, 42); + testKeyGroupRangeBasicsInternal(3, 7); + testKeyGroupRangeBasicsInternal(0, Short.MAX_VALUE); + testKeyGroupRangeBasicsInternal(Short.MAX_VALUE - 1, Short.MAX_VALUE); + + try { + testKeyGroupRangeBasicsInternal(-3, 2); + Assert.fail(); + } catch (IllegalArgumentException ex) { + //expected + } + + } + + private void testKeyGroupRangeBasicsInternal(int startKeyGroup, int endKeyGroup) { + KeyGroupRange keyGroupRange = KeyGroupRange.of(startKeyGroup, endKeyGroup); + int numberOfKeyGroup = keyGroupRange.getNumberOfKeyGroups(); + Assert.assertEquals(Math.max(0, endKeyGroup - startKeyGroup + 1), numberOfKeyGroup); + if (keyGroupRange.getNumberOfKeyGroups() > 0) { + Assert.assertEquals(startKeyGroup, keyGroupRange.getStartKeyGroup()); + Assert.assertEquals(endKeyGroup, keyGroupRange.getEndKeyGroup()); + int c = startKeyGroup; + for(int i : keyGroupRange) { + Assert.assertEquals(c, i); + Assert.assertTrue(keyGroupRange.contains(i)); + ++c; + } + + Assert.assertEquals(endKeyGroup + 1, c); + Assert.assertFalse(keyGroupRange.contains(startKeyGroup - 1)); + Assert.assertFalse(keyGroupRange.contains(endKeyGroup + 1)); + } else { + Assert.assertEquals(KeyGroupRange.EMPTY_KEY_GROUP, keyGroupRange); + } + } + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java index d3b4dbc..940b337 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/MemoryStateBackendTest.java @@ -57,37 +57,26 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack public void testReducingStateRestoreWithWrongSerializers() {} @Test - public void testSerializableState() { + public void testOversizedState() { try { - MemoryStateBackend backend = new MemoryStateBackend(); + MemoryStateBackend backend = new MemoryStateBackend(10); HashMap<String, Integer> state = new HashMap<>(); state.put("hey there", 2); state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77); - StateHandle<HashMap<String, Integer>> handle = backend.checkpointStateSerializable(state, 12, 459); - assertNotNull(handle); + try { + AbstractStateBackend.CheckpointStateOutputStream outStream = backend.createCheckpointStateOutputStream( + 12, + 459); - HashMap<String, Integer> restored = handle.getState(getClass().getClassLoader()); - assertEquals(state, restored); - } - catch (Exception e) { - e.printStackTrace(); - fail(e.getMessage()); - } - } + ObjectOutputStream oos = new ObjectOutputStream(outStream); + oos.writeObject(state); - @Test - public void testOversizedState() { - try { - MemoryStateBackend backend = new MemoryStateBackend(10); + oos.flush(); - HashMap<String, Integer> state = new HashMap<>(); - state.put("hey there", 2); - state.put("the crazy brown fox stumbles over a sentence that does not contain every letter", 77); + outStream.closeAndGetHandle(); - try { - backend.checkpointStateSerializable(state, 12, 459); fail("this should cause an exception"); } catch (IOException e) { @@ -117,7 +106,7 @@ public class MemoryStateBackendTest extends StateBackendTestBase<MemoryStateBack assertNotNull(handle); - ObjectInputStream ois = new ObjectInputStream(handle.getState(getClass().getClassLoader())); + ObjectInputStream ois = new ObjectInputStream(handle.openInputStream()); assertEquals(state, ois.readObject()); assertTrue(ois.available() <= 0); ois.close(); http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java index 7b00b27..834c35c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/StateBackendTestBase.java @@ -84,7 +84,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { @After public void teardown() throws Exception { - this.backend.dispose(); + this.backend.discardState(); cleanup(); } @@ -154,7 +154,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { assertEquals("u3", state.value()); assertEquals("u3", getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); - backend.dispose(); + backend.discardState(); backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); backend.injectKeyValueStateSnapshots((HashMap) snapshot1); @@ -174,7 +174,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { assertEquals("2", restored1.value()); assertEquals("2", getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); - backend.dispose(); + backend.discardState(); backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); backend.injectKeyValueStateSnapshots((HashMap) snapshot2); @@ -254,7 +254,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { } } - backend.dispose(); + backend.discardState(); backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); backend.injectKeyValueStateSnapshots((HashMap) snapshot1); @@ -334,7 +334,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { assertEquals("u3", joiner.join(state.get())); assertEquals("u3", joiner.join(getSerializedList(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))); - backend.dispose(); + backend.discardState(); // restore the first snapshot and validate it backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); @@ -355,7 +355,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { assertEquals("2", joiner.join(restored1.get())); assertEquals("2", joiner.join(getSerializedList(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer))); - backend.dispose(); + backend.discardState(); // restore the second snapshot and validate it backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); @@ -452,7 +452,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { assertEquals("u3", state.get()); assertEquals("u3", getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); - backend.dispose(); + backend.discardState(); // restore the first snapshot and validate it backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); @@ -473,7 +473,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { assertEquals("2", restored1.get()); assertEquals("2", getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); - backend.dispose(); + backend.discardState(); // restore the second snapshot and validate it backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); @@ -574,7 +574,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { assertEquals("Fold-Initial:,103", state.get()); assertEquals("Fold-Initial:,103", getSerializedValue(kvState, 3, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); - backend.dispose(); + backend.discardState(); // restore the first snapshot and validate it backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); @@ -595,7 +595,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { assertEquals("Fold-Initial:,2", restored1.get()); assertEquals("Fold-Initial:,2", getSerializedValue(restoredKvState1, 2, keySerializer, VoidNamespace.INSTANCE, namespaceSerializer, valueSerializer)); - backend.dispose(); + backend.discardState(); // restore the second snapshot and validate it backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); @@ -653,7 +653,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { } } - backend.dispose(); + backend.discardState(); // restore the first snapshot and validate it backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); @@ -713,7 +713,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { } } - backend.dispose(); + backend.discardState(); // restore the first snapshot and validate it backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); @@ -775,7 +775,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { } } - backend.dispose(); + backend.discardState(); // restore the first snapshot and validate it backend.initializeForJob(new DummyEnvironment("test", 1, 0), "test_op", IntSerializer.INSTANCE); @@ -1030,7 +1030,7 @@ public abstract class StateBackendTestBase<B extends AbstractStateBackend> { } // Verify unregistered - backend.dispose(); + backend.discardState(); verify(listener, times(1)).notifyKvStateUnregistered( eq(env.getJobID()), eq(env.getJobVertexId()), eq(0), eq("banana")); http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java new file mode 100644 index 0000000..1d45115 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/filesystem/FsCheckpointStateOutputStreamTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.state.filesystem; + +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.runtime.state.AbstractStateBackend; + +import org.apache.flink.runtime.state.StreamStateHandle; +import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; +import org.junit.Test; + +import java.io.DataInputStream; +import java.io.File; +import java.io.InputStream; +import java.util.Random; + +import static org.junit.Assert.*; + +public class FsCheckpointStateOutputStreamTest { + + /** The temp dir, obtained in a platform neutral way */ + private static final Path TEMP_DIR_PATH = new Path(new File(System.getProperty("java.io.tmpdir")).toURI()); + + + @Test(expected = IllegalArgumentException.class) + public void testWrongParameters() { + // this should fail + new FsStateBackend.FsCheckpointStateOutputStream( + TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 4000, 5000); + } + + + @Test + public void testEmptyState() throws Exception { + AbstractStateBackend.CheckpointStateOutputStream stream = new FsStateBackend.FsCheckpointStateOutputStream( + TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), 1024, 512); + + StreamStateHandle handle = stream.closeAndGetHandle(); + assertTrue(handle instanceof ByteStreamStateHandle); + + InputStream inStream = handle.openInputStream(); + assertEquals(-1, inStream.read()); + } + + @Test + public void testStateBlowMemThreshold() throws Exception { + runTest(222, 999, 512, false); + } + + @Test + public void testStateOneBufferAboveThreshold() throws Exception { + runTest(896, 1024, 15, true); + } + + @Test + public void testStateAboveMemThreshold() throws Exception { + runTest(576446, 259, 17, true); + } + + @Test + public void testZeroThreshold() throws Exception { + runTest(16678, 4096, 0, true); + } + + private void runTest(int numBytes, int bufferSize, int threshold, boolean expectFile) throws Exception { + AbstractStateBackend.CheckpointStateOutputStream stream = + new FsStateBackend.FsCheckpointStateOutputStream( + TEMP_DIR_PATH, FileSystem.getLocalFileSystem(), bufferSize, threshold); + + Random rnd = new Random(); + byte[] original = new byte[numBytes]; + byte[] bytes = new byte[original.length]; + + rnd.nextBytes(original); + System.arraycopy(original, 0, bytes, 0, original.length); + + // the test writes a mixture of writing individual bytes and byte arrays + int pos = 0; + while (pos < bytes.length) { + boolean single = rnd.nextBoolean(); + if (single) { + stream.write(bytes[pos++]); + } + else { + int num = rnd.nextInt(Math.min(10, bytes.length - pos)); + stream.write(bytes, pos, num); + pos += num; + } + } + + StreamStateHandle handle = stream.closeAndGetHandle(); + if (expectFile) { + assertTrue(handle instanceof FileStateHandle); + } else { + assertTrue(handle instanceof ByteStreamStateHandle); + } + + // make sure the writing process did not alter the original byte array + assertArrayEquals(original, bytes); + + InputStream inStream = handle.openInputStream(); + byte[] validation = new byte[bytes.length]; + + DataInputStream dataInputStream = new DataInputStream(inStream); + dataInputStream.readFully(validation); + + assertArrayEquals(bytes, validation); + + handle.discardState(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index fc1a5df..429fc6b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -44,16 +44,19 @@ import org.apache.flink.runtime.jobgraph.tasks.StatefulTask; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.query.TaskKvStateRegistry; -import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.ChainedStateHandle; +import org.apache.flink.runtime.state.KeyGroupsStateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; + import org.apache.flink.util.SerializedValue; import org.junit.Before; import org.junit.Test; import scala.concurrent.duration.FiniteDuration; -import java.io.Serializable; import java.net.URL; import java.util.Collections; +import java.util.List; import java.util.concurrent.TimeUnit; import static org.junit.Assert.assertFalse; @@ -179,8 +182,8 @@ public class TaskAsyncCallTest { new TaskManagerRuntimeInfo("localhost", new Configuration(), System.getProperty("java.io.tmpdir")), mock(TaskMetricGroup.class)); } - - public static class CheckpointsInOrderInvokable extends AbstractInvokable implements StatefulTask<StateHandle<Serializable>> { + + public static class CheckpointsInOrderInvokable extends AbstractInvokable implements StatefulTask { private volatile long lastCheckpointId = 0; @@ -204,7 +207,10 @@ public class TaskAsyncCallTest { } @Override - public void setInitialState(StateHandle<Serializable> stateHandle) throws Exception {} + public void setInitialState(ChainedStateHandle<StreamStateHandle> chainedState, + List<KeyGroupsStateHandle> keyGroupsState) throws Exception { + + } @Override public boolean triggerCheckpoint(long checkpointId, long timestamp) { http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java index 7505bfc..7e8868c 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/zookeeper/ZooKeeperStateHandleStoreITCase.java @@ -22,7 +22,7 @@ import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.api.BackgroundCallback; import org.apache.curator.framework.api.CuratorEvent; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.RetrievableStateHandle; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.TestLogger; import org.apache.zookeeper.CreateMode; @@ -97,7 +97,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { // Verify // State handle created assertEquals(1, store.getAll().size()); - assertEquals(state, store.get(pathInZooKeeper).getState(null)); + assertEquals(state, store.get(pathInZooKeeper).retrieveState()); // Path created and is persistent Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper); @@ -106,9 +106,9 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { // Data is equal @SuppressWarnings("unchecked") - Long actual = ((StateHandle<Long>) InstantiationUtil.deserializeObject( + Long actual = ((RetrievableStateHandle<Long>) InstantiationUtil.deserializeObject( ZooKeeper.getClient().getData().forPath(pathInZooKeeper), - ClassLoader.getSystemClassLoader())).getState(null); + ClassLoader.getSystemClassLoader())).retrieveState(); assertEquals(state, actual); } @@ -149,7 +149,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { // Verify // State handle created assertEquals(i + 1, store.getAll().size()); - assertEquals(state, longStateStorage.getStateHandles().get(i).getState(null)); + assertEquals(state, longStateStorage.getStateHandles().get(i).retrieveState()); // Path created Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper); @@ -166,9 +166,9 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { // Data is equal @SuppressWarnings("unchecked") - Long actual = ((StateHandle<Long>) InstantiationUtil.deserializeObject( + Long actual = ((RetrievableStateHandle<Long>) InstantiationUtil.deserializeObject( ZooKeeper.getClient().getData().forPath(pathInZooKeeper), - ClassLoader.getSystemClassLoader())).getState(null); + ClassLoader.getSystemClassLoader())).retrieveState(); assertEquals(state, actual); } @@ -218,7 +218,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { // Verify // State handle created and discarded assertEquals(1, stateHandleProvider.getStateHandles().size()); - assertEquals(state, stateHandleProvider.getStateHandles().get(0).getState(null)); + assertEquals(state, stateHandleProvider.getStateHandles().get(0).retrieveState()); assertEquals(1, stateHandleProvider.getStateHandles().get(0).getNumberOfDiscardCalls()); } @@ -245,8 +245,8 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { // Verify // State handles created assertEquals(2, stateHandleProvider.getStateHandles().size()); - assertEquals(initialState, stateHandleProvider.getStateHandles().get(0).getState(null)); - assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).getState(null)); + assertEquals(initialState, stateHandleProvider.getStateHandles().get(0).retrieveState()); + assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).retrieveState()); // Path created and is persistent Stat stat = ZooKeeper.getClient().checkExists().forPath(pathInZooKeeper); @@ -255,9 +255,9 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { // Data is equal @SuppressWarnings("unchecked") - Long actual = ((StateHandle<Long>) InstantiationUtil.deserializeObject( + Long actual = ((RetrievableStateHandle<Long>) InstantiationUtil.deserializeObject( ZooKeeper.getClient().getData().forPath(pathInZooKeeper), - ClassLoader.getSystemClassLoader())).getState(null); + ClassLoader.getSystemClassLoader())).retrieveState(); assertEquals(replaceState, actual); } @@ -267,7 +267,7 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { */ @Test(expected = Exception.class) public void testReplaceNonExistingPath() throws Exception { - StateStorageHelper<Long> stateStorage = new LongStateStorage(); + RetrievableStateStorageHelper<Long> stateStorage = new LongStateStorage(); ZooKeeperStateHandleStore<Long> store = new ZooKeeperStateHandleStore<>( ZooKeeper.getClient(), stateStorage); @@ -307,15 +307,15 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { // Verify // State handle created and discarded assertEquals(2, stateHandleProvider.getStateHandles().size()); - assertEquals(initialState, stateHandleProvider.getStateHandles().get(0).getState(null)); - assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).getState(null)); + assertEquals(initialState, stateHandleProvider.getStateHandles().get(0).retrieveState()); + assertEquals(replaceState, stateHandleProvider.getStateHandles().get(1).retrieveState()); assertEquals(1, stateHandleProvider.getStateHandles().get(1).getNumberOfDiscardCalls()); // Initial value @SuppressWarnings("unchecked") - Long actual = ((StateHandle<Long>) InstantiationUtil.deserializeObject( + Long actual = ((RetrievableStateHandle<Long>) InstantiationUtil.deserializeObject( ZooKeeper.getClient().getData().forPath(pathInZooKeeper), - ClassLoader.getSystemClassLoader())).getState(null); + ClassLoader.getSystemClassLoader())).retrieveState(); assertEquals(initialState, actual); } @@ -339,10 +339,10 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { assertEquals(-1, store.exists(pathInZooKeeper)); store.add(pathInZooKeeper, state); - StateHandle<Long> actual = store.get(pathInZooKeeper); + RetrievableStateHandle<Long> actual = store.get(pathInZooKeeper); // Verify - assertEquals(state, actual.getState(null)); + assertEquals(state, actual.retrieveState()); assertTrue(store.exists(pathInZooKeeper) >= 0); } @@ -384,8 +384,8 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { store.add(pathInZooKeeper, val, CreateMode.PERSISTENT_SEQUENTIAL); } - for (Tuple2<StateHandle<Long>, String> val : store.getAll()) { - assertTrue(expected.remove(val.f0.getState(null))); + for (Tuple2<RetrievableStateHandle<Long>, String> val : store.getAll()) { + assertTrue(expected.remove(val.f0.retrieveState())); } assertEquals(0, expected.size()); } @@ -412,11 +412,11 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { store.add(pathInZooKeeper, val, CreateMode.PERSISTENT_SEQUENTIAL); } - List<Tuple2<StateHandle<Long>, String>> actual = store.getAllSortedByName(); + List<Tuple2<RetrievableStateHandle<Long>, String>> actual = store.getAllSortedByName(); assertEquals(expected.length, actual.size()); for (int i = 0; i < expected.length; i++) { - assertEquals(expected[i], actual.get(i).f0.getState(null)); + assertEquals(expected[i], actual.get(i).f0.retrieveState()); } } @@ -540,24 +540,24 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { // Simple test helpers // --------------------------------------------------------------------------------------------- - private static class LongStateStorage implements StateStorageHelper<Long> { + private static class LongStateStorage implements RetrievableStateStorageHelper<Long> { - private final List<LongStateHandle> stateHandles = new ArrayList<>(); + private final List<LongRetrievableStateHandle> stateHandles = new ArrayList<>(); @Override - public StateHandle<Long> store(Long state) throws Exception { - LongStateHandle stateHandle = new LongStateHandle(state); + public RetrievableStateHandle<Long> store(Long state) throws Exception { + LongRetrievableStateHandle stateHandle = new LongRetrievableStateHandle(state); stateHandles.add(stateHandle); return stateHandle; } - List<LongStateHandle> getStateHandles() { + List<LongRetrievableStateHandle> getStateHandles() { return stateHandles; } } - private static class LongStateHandle implements StateHandle<Long> { + private static class LongRetrievableStateHandle implements RetrievableStateHandle<Long> { private static final long serialVersionUID = -3555329254423838912L; @@ -565,12 +565,12 @@ public class ZooKeeperStateHandleStoreITCase extends TestLogger { private int numberOfDiscardCalls; - public LongStateHandle(Long state) { + public LongRetrievableStateHandle(Long state) { this.state = state; } @Override - public Long getState(ClassLoader ignored) throws Exception { + public Long retrieveState() throws Exception { return state; } http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java index 9388818..4822f91 100644 --- a/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java +++ b/flink-streaming-connectors/flink-connector-cassandra/src/test/java/org/apache/flink/streaming/connectors/cassandra/CassandraConnectorITCase.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -46,11 +46,10 @@ import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.runtime.operators.WriteAheadSinkTestBase; -import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; -import org.apache.flink.streaming.runtime.tasks.OneInputStreamTaskTestHarness; + +import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.streaming.util.TestStreamEnvironment; import org.apache.flink.test.util.ForkableFlinkMiniCluster; - import org.apache.flink.test.util.TestEnvironment; import org.junit.After; import org.junit.AfterClass; @@ -276,8 +275,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri @Override protected void verifyResultsIdealCircumstances( - OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, - OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task, + OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { ResultSet result = session.execute(SELECT_DATA_QUERY); @@ -294,8 +292,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri @Override protected void verifyResultsDataPersistenceUponMissedNotify( - OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, - OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task, + OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { ResultSet result = session.execute(SELECT_DATA_QUERY); @@ -312,8 +309,7 @@ public class CassandraConnectorITCase extends WriteAheadSinkTestBase<Tuple3<Stri @Override protected void verifyResultsDataDiscardingUponRestore( - OneInputStreamTaskTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, - OneInputStreamTask<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> task, + OneInputStreamOperatorTestHarness<Tuple3<String, Integer, Integer>, Tuple3<String, Integer, Integer>> harness, CassandraTupleWriteAheadSink<Tuple3<String, Integer, Integer>> sink) { ResultSet result = session.execute(SELECT_DATA_QUERY); http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java index 828dcc9..e274fdd 100644 --- a/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java +++ b/flink-streaming-connectors/flink-connector-filesystem/src/test/java/org/apache/flink/streaming/connectors/fs/bucketing/BucketingSinkTest.java @@ -28,13 +28,13 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.streaming.api.operators.StreamSink; import org.apache.flink.streaming.connectors.fs.AvroKeyValueSinkWriter; import org.apache.flink.streaming.connectors.fs.Clock; import org.apache.flink.streaming.connectors.fs.SequenceFileWriter; import org.apache.flink.streaming.connectors.fs.StringWriter; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.StreamTaskState; import org.apache.flink.streaming.runtime.tasks.TestTimeServiceProvider; import org.apache.flink.streaming.runtime.tasks.TimeServiceProvider; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -137,11 +137,11 @@ public class BucketingSinkTest { // snapshot but don't call notify to simulate a notify that never // arrives, the sink should move pending files in restore() in that case - StreamTaskState snapshot1 = testHarness.snapshot(0, 0); + StreamStateHandle snapshot1 = testHarness.snapshot(0, 0); testHarness = createTestSink(dataDir, clock); testHarness.setup(); - testHarness.restore(snapshot1, 1); + testHarness.restore(snapshot1); testHarness.open(); testHarness.processElement(new StreamRecord<>("Hello")); http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index fda5efd..6d67560 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -24,11 +24,10 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.metrics.Counter; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; @@ -36,13 +35,11 @@ import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.tasks.StreamTaskState; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.InputStream; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; import java.io.Serializable; @@ -376,14 +373,10 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A // --------------------- Checkpointing -------------------------- @Override - public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception { - StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); - - final AbstractStateBackend.CheckpointStateOutputStream os = - this.getStateBackend().createCheckpointStateOutputStream(checkpointId, timestamp); + public void snapshotState(FSDataOutputStream os, long checkpointId, long timestamp) throws Exception { + super.snapshotState(os, checkpointId, timestamp); final ObjectOutputStream oos = new ObjectOutputStream(os); - final AbstractStateBackend.CheckpointStateOutputView ov = new AbstractStateBackend.CheckpointStateOutputView(os); Tuple3<List<FileInputSplit>, FileInputSplit, S> readerState = this.reader.getReaderState(); List<FileInputSplit> pendingSplits = readerState.f0; @@ -392,35 +385,28 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A // write the current split oos.writeObject(currSplit); - - // write the pending ones - ov.writeInt(pendingSplits.size()); + oos.writeInt(pendingSplits.size()); for (FileInputSplit split : pendingSplits) { oos.writeObject(split); } // write the state of the reading channel oos.writeObject(formatState); - taskState.setOperatorState(os.closeAndGetHandle()); - return taskState; + oos.flush(); } @Override - public void restoreState(StreamTaskState state) throws Exception { - super.restoreState(state); - - StreamStateHandle stream = (StreamStateHandle) state.getOperatorState(); + public void restoreState(FSDataInputStream is) throws Exception { + super.restoreState(is); - final InputStream is = stream.getState(getUserCodeClassloader()); final ObjectInputStream ois = new ObjectInputStream(is); - final DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(is); // read the split that was being read FileInputSplit currSplit = (FileInputSplit) ois.readObject(); // read the pending splits list List<FileInputSplit> pendingSplits = new LinkedList<>(); - int noOfSplits = div.readInt(); + int noOfSplits = ois.readInt(); for (int i = 0; i < noOfSplits; i++) { FileInputSplit split = (FileInputSplit) ois.readObject(); pendingSplits.add(split); @@ -435,6 +421,5 @@ public class ContinuousFileReaderOperator<OUT, S extends Serializable> extends A "The reader state has already been initialized."); this.readerState = new Tuple3<>(pendingSplits, currSplit, formatState); - div.close(); } } http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index 15bb384..59ecd15 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -25,6 +25,9 @@ import org.apache.flink.api.common.state.StateDescriptor; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.metrics.Counter; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.runtime.state.AsynchronousKvStateSnapshot; import org.apache.flink.metrics.MetricGroup; import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.runtime.state.KvStateSnapshot; @@ -35,11 +38,14 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.tasks.StreamTaskState; +import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.DataInputStream; +import java.io.DataOutputStream; import java.util.HashMap; +import java.util.Set; import java.util.concurrent.ScheduledFuture; /** @@ -91,7 +97,7 @@ public abstract class AbstractStreamOperator<OUT> private transient KeySelector<?, ?> stateKeySelector2; /** The state backend that stores the state and checkpoints for this task */ - private AbstractStateBackend stateBackend = null; + private transient AbstractStateBackend stateBackend; protected MetricGroup metrics; // ------------------------------------------------------------------------ @@ -164,7 +170,7 @@ public abstract class AbstractStreamOperator<OUT> if (stateBackend != null) { try { stateBackend.close(); - stateBackend.dispose(); + stateBackend.discardState(); } catch (Exception e) { throw new RuntimeException("Error while closing/disposing state backend.", e); } @@ -176,30 +182,45 @@ public abstract class AbstractStreamOperator<OUT> // ------------------------------------------------------------------------ @Override - public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception { - // here, we deal with key/value state snapshots - - StreamTaskState state = new StreamTaskState(); - - if (stateBackend != null) { - HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> partitionedSnapshots = - stateBackend.snapshotPartitionedState(checkpointId, timestamp); - if (partitionedSnapshots != null) { - state.setKvStates(partitionedSnapshots); + public void snapshotState(FSDataOutputStream out, + long checkpointId, + long timestamp) throws Exception { + + HashMap<String, KvStateSnapshot<?, ?, ?, ?, ?>> keyedState = + stateBackend.snapshotPartitionedState(checkpointId,timestamp); + + // Materialize asynchronous snapshots, if any + if (keyedState != null) { + Set<String> keys = keyedState.keySet(); + for (String key: keys) { + if (keyedState.get(key) instanceof AsynchronousKvStateSnapshot) { + AsynchronousKvStateSnapshot<?, ?, ?, ?, ?> asyncHandle = (AsynchronousKvStateSnapshot<?, ?, ?, ?, ?>) keyedState.get(key); + keyedState.put(key, asyncHandle.materialize()); + } } } - return state; + byte[] serializedSnapshot = InstantiationUtil.serializeObject(keyedState); + + DataOutputStream dos = new DataOutputStream(out); + dos.writeInt(serializedSnapshot.length); + dos.write(serializedSnapshot); + + dos.flush(); + } - + @Override - @SuppressWarnings("rawtypes,unchecked") - public void restoreState(StreamTaskState state) throws Exception { - // restore the key/value state. the actual restore happens lazily, when the function requests - // the state again, because the restore method needs information provided by the user function - if (stateBackend != null) { - stateBackend.injectKeyValueStateSnapshots((HashMap)state.getKvStates()); - } + public void restoreState(FSDataInputStream in) throws Exception { + DataInputStream dis = new DataInputStream(in); + int size = dis.readInt(); + byte[] serializedSnapshot = new byte[size]; + dis.readFully(serializedSnapshot); + + HashMap<String, KvStateSnapshot> keyedState = + InstantiationUtil.deserializeObject(serializedSnapshot, getUserCodeClassloader()); + + stateBackend.injectKeyValueStateSnapshots(keyedState); } @Override @@ -249,8 +270,8 @@ public abstract class AbstractStreamOperator<OUT> } /** - * Register a timer callback. At the specified time the provided {@link Triggerable} will - * be invoked. This call is guaranteed to not happen concurrently with method calls on the operator. + * Register a timer callback. At the specified time the {@link Triggerable} will be invoked. + * This call is guaranteed to not happen concurrently with method calls on the operator. * * @param time The absolute time in milliseconds. * @param target The target to be triggered. @@ -281,11 +302,18 @@ public abstract class AbstractStreamOperator<OUT> */ @SuppressWarnings("unchecked") protected <S extends State, N> S getPartitionedState(N namespace, TypeSerializer<N> namespaceSerializer, StateDescriptor<S, ?> stateDescriptor) throws Exception { - return getStateBackend().getPartitionedState(namespace, (TypeSerializer<Object>) namespaceSerializer, - stateDescriptor); + if (stateBackend != null) { + return stateBackend.getPartitionedState( + namespace, + namespaceSerializer, + stateDescriptor); + } else { + throw new RuntimeException("Cannot create partitioned state. The key grouped state " + + "backend has not been set. This indicates that the operator is not " + + "partitioned/keyed."); + } } - @Override @SuppressWarnings({"unchecked", "rawtypes"}) public void setKeyContextElement1(StreamRecord record) throws Exception { @@ -300,17 +328,25 @@ public abstract class AbstractStreamOperator<OUT> public void setKeyContextElement2(StreamRecord record) throws Exception { if (stateKeySelector2 != null) { Object key = ((KeySelector) stateKeySelector2).getKey(record.getValue()); - getStateBackend().setCurrentKey(key); + + setKeyContext(key); } } @SuppressWarnings({"unchecked", "rawtypes"}) public void setKeyContext(Object key) { - if (stateKeySelector1 != null) { - stateBackend.setCurrentKey(key); + if (stateBackend != null) { + try { + stateBackend.setCurrentKey(key); + } catch (Exception e) { + throw new RuntimeException("Exception occurred while setting the current key context.", e); + } + } else { + throw new RuntimeException("Could not set the current key context, because the " + + "AbstractStateBackend has not been initialized."); } } - + // ------------------------------------------------------------------------ // Context and chaining properties // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index 1ddd934..b1bc531 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -18,6 +18,8 @@ package org.apache.flink.streaming.api.operators; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; import java.io.Serializable; import org.apache.flink.annotation.PublicEvolving; @@ -26,14 +28,13 @@ import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.util.FunctionUtils; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.runtime.state.CheckpointListener; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.tasks.StreamTaskState; import static java.util.Objects.requireNonNull; @@ -117,8 +118,8 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends // ------------------------------------------------------------------------ @Override - public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception { - StreamTaskState state = super.snapshotOperatorState(checkpointId, timestamp); + public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception { + super.snapshotState(out, checkpointId, timestamp); if (userFunction instanceof Checkpointed) { @SuppressWarnings("unchecked") @@ -127,45 +128,39 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function> extends Serializable udfState; try { udfState = chkFunction.snapshotState(checkpointId, timestamp); - } - catch (Exception e) { - throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e); - } - - if (udfState != null) { - try { - AbstractStateBackend stateBackend = getStateBackend(); - StateHandle<Serializable> handle = - stateBackend.checkpointStateSerializable(udfState, checkpointId, timestamp); - state.setFunctionState(handle); - } - catch (Exception e) { - throw new Exception("Failed to add the state snapshot of the function to the checkpoint: " - + e.getMessage(), e); + if (udfState != null) { + out.write(1); + ObjectOutputStream os = new ObjectOutputStream(out); + os.writeObject(udfState); + os.flush(); + } else { + out.write(0); } + } catch (Exception e) { + throw new Exception("Failed to draw state snapshot from function: " + e.getMessage(), e); } } - - return state; } @Override - public void restoreState(StreamTaskState state) throws Exception { - super.restoreState(state); - - StateHandle<Serializable> stateHandle = state.getFunctionState(); - - if (userFunction instanceof Checkpointed && stateHandle != null) { + public void restoreState(FSDataInputStream in) throws Exception { + super.restoreState(in); + + if (userFunction instanceof Checkpointed) { @SuppressWarnings("unchecked") Checkpointed<Serializable> chkFunction = (Checkpointed<Serializable>) userFunction; - - Serializable functionState = stateHandle.getState(getUserCodeClassloader()); - if (functionState != null) { - try { - chkFunction.restoreState(functionState); - } - catch (Exception e) { - throw new Exception("Failed to restore state to function: " + e.getMessage(), e); + + int hasUdfState = in.read(); + + if (hasUdfState == 1) { + ObjectInputStream ois = new ObjectInputStream(in); + Serializable functionState = (Serializable) ois.readObject(); + if (functionState != null) { + try { + chkFunction.restoreState(functionState); + } catch (Exception e) { + throw new Exception("Failed to restore state to function: " + e.getMessage(), e); + } } } } http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java index 3e38165..3411a60 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperator.java @@ -21,10 +21,11 @@ import java.io.Serializable; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.StreamTask; -import org.apache.flink.streaming.runtime.tasks.StreamTaskState; /** * Basic interface for stream operators. Implementers would implement one of @@ -94,17 +95,15 @@ public interface StreamOperator<OUT> extends Serializable { * (if the operator is stateful) and the key/value state (if it is being used and has been * initialized). * + * @param out The stream to which we have to write our state. * @param checkpointId The ID of the checkpoint. * @param timestamp The timestamp of the checkpoint. * - * @return The StreamTaskState object, possibly containing the snapshots for the - * operator and key/value state. - * * @throws Exception Forwards exceptions that occur while drawing snapshots from the operator * and the key/value state. */ - StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception; - + void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception; + /** * Restores the operator state, if this operator's execution is recovering from a checkpoint. * This method restores the operator state (if the operator is stateful) and the key/value state @@ -113,13 +112,12 @@ public interface StreamOperator<OUT> extends Serializable { * <p>This method is called after {@link #setup(StreamTask, StreamConfig, Output)} * and before {@link #open()}. * - * @param state The state of operator that was snapshotted as part of checkpoint - * from which the execution is restored. + * @param in The stream from which we have to restore our state. * * @throws Exception Exceptions during state restore should be forwarded, so that the system can * properly react to failed state restore and fail the execution attempt. */ - void restoreState(StreamTaskState state) throws Exception; + void restoreState(FSDataInputStream in) throws Exception; /** * Called when the checkpoint with the given ID is completed and acknowledged on the JobManager. http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java index 34c62fb..8d074cc 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/GenericWriteAheadSink.java @@ -19,17 +19,19 @@ package org.apache.flink.streaming.runtime.operators; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.io.disk.InputViewIterator; import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.util.ReusingMutableToRegularIteratorWrapper; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.StreamTaskState; -import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -54,7 +56,7 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I protected static final Logger LOG = LoggerFactory.getLogger(GenericWriteAheadSink.class); private final CheckpointCommitter committer; - private transient AbstractStateBackend.CheckpointStateOutputView out; + private transient AbstractStateBackend.CheckpointStateOutputStream out; protected final TypeSerializer<IN> serializer; private final String id; @@ -89,7 +91,7 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I private void saveHandleInState(final long checkpointId, final long timestamp) throws Exception { //only add handle if a new OperatorState was created since the last snapshot if (out != null) { - StateHandle<DataInputView> handle = out.closeAndGetHandle(); + StreamStateHandle handle = out.closeAndGetHandle(); if (state.pendingHandles.containsKey(checkpointId)) { //we already have a checkpoint stored for that ID that may have been partially written, //so we discard this "alternate version" and use the stored checkpoint @@ -102,18 +104,21 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I } @Override - public StreamTaskState snapshotOperatorState(final long checkpointId, final long timestamp) throws Exception { - StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); + public void snapshotState(FSDataOutputStream out, + long checkpointId, + long timestamp) throws Exception { + super.snapshotState(out, checkpointId, timestamp); + saveHandleInState(checkpointId, timestamp); - taskState.setFunctionState(state); - return taskState; + + InstantiationUtil.serializeObject(out, state); } @Override - public void restoreState(StreamTaskState state) throws Exception { - super.restoreState(state); - this.state = (ExactlyOnceState) state.getFunctionState(); - out = null; + public void restoreState(FSDataInputStream in) throws Exception { + super.restoreState(in); + + this.state = InstantiationUtil.deserializeObject(in, getUserCodeClassloader()); } private void cleanState() throws Exception { @@ -142,9 +147,9 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I if (pastCheckpointId <= checkpointId) { try { if (!committer.isCheckpointCommitted(pastCheckpointId)) { - Tuple2<Long, StateHandle<DataInputView>> handle = state.pendingHandles.get(pastCheckpointId); - DataInputView in = handle.f1.getState(getUserCodeClassloader()); - boolean success = sendValues(new ReusingMutableToRegularIteratorWrapper<>(new InputViewIterator<>(in, serializer), serializer), handle.f0); + Tuple2<Long, StreamStateHandle> handle = state.pendingHandles.get(pastCheckpointId); + FSDataInputStream in = handle.f1.openInputStream(); + boolean success = sendValues(new ReusingMutableToRegularIteratorWrapper<>(new InputViewIterator<>(new DataInputViewStreamWrapper(in), serializer), serializer), handle.f0); if (success) { //if the sending has failed we will retry on the next notify committer.commitCheckpoint(pastCheckpointId); checkpointsToRemove.add(pastCheckpointId); @@ -159,7 +164,7 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I } } for (Long toRemove : checkpointsToRemove) { - Tuple2<Long, StateHandle<DataInputView>> handle = state.pendingHandles.get(toRemove); + Tuple2<Long, StreamStateHandle> handle = state.pendingHandles.get(toRemove); state.pendingHandles.remove(toRemove); handle.f1.discardState(); } @@ -181,9 +186,9 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I IN value = element.getValue(); //generate initial operator state if (out == null) { - out = getStateBackend().createCheckpointStateOutputView(0, 0); + out = getStateBackend().createCheckpointStateOutputStream(0, 0); } - serializer.serialize(value, out); + serializer.serialize(value, new DataOutputViewStreamWrapper(out)); } @Override @@ -195,59 +200,21 @@ public abstract class GenericWriteAheadSink<IN> extends AbstractStreamOperator<I * This state is used to keep a list of all StateHandles (essentially references to past OperatorStates) that were * used since the last completed checkpoint. **/ - public static class ExactlyOnceState implements StateHandle<Serializable> { + public static class ExactlyOnceState implements Serializable { private static final long serialVersionUID = -3571063495273460743L; - protected TreeMap<Long, Tuple2<Long, StateHandle<DataInputView>>> pendingHandles; + protected TreeMap<Long, Tuple2<Long, StreamStateHandle>> pendingHandles; public ExactlyOnceState() { pendingHandles = new TreeMap<>(); } - @Override - public TreeMap<Long, Tuple2<Long, StateHandle<DataInputView>>> getState(ClassLoader userCodeClassLoader) throws Exception { + public TreeMap<Long, Tuple2<Long, StreamStateHandle>> getState(ClassLoader userCodeClassLoader) throws Exception { return pendingHandles; } @Override - public void discardState() throws Exception { - //we specifically want the state to survive failed jobs, so we don't discard anything - } - - @Override - public long getStateSize() throws Exception { - int stateSize = 0; - for (Tuple2<Long, StateHandle<DataInputView>> pair : pendingHandles.values()) { - stateSize += pair.f1.getStateSize(); - } - return stateSize; - } - - @Override - public void close() throws IOException { - Throwable exception = null; - - for (Tuple2<Long, StateHandle<DataInputView>> pair : pendingHandles.values()) { - StateHandle<DataInputView> handle = pair.f1; - if (handle != null) { - try { - handle.close(); - } - catch (Throwable t) { - if (exception != null) { - exception = t; - } - } - } - } - - if (exception != null) { - ExceptionUtils.rethrowIOException(exception); - } - } - - @Override public String toString() { return this.pendingHandles.toString(); } http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java index fdc8117..2c95099 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/AbstractAlignedProcessingTimeWindowOperator.java @@ -24,18 +24,18 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.util.MathUtils; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; import org.apache.flink.streaming.api.operators.TimestampedCollector; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; -import org.apache.flink.runtime.state.AbstractStateBackend; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.StreamTaskState; import static java.util.Objects.requireNonNull; @@ -244,36 +244,35 @@ public abstract class AbstractAlignedProcessingTimeWindowOperator<KEY, IN, OUT, // ------------------------------------------------------------------------ @Override - public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception { - StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); - + public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception { + super.snapshotState(out, checkpointId, timestamp); + // we write the panes with the key/value maps into the stream, as well as when this state // should have triggered and slided - AbstractStateBackend.CheckpointStateOutputView out = - getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp); - out.writeLong(nextEvaluationTime); - out.writeLong(nextSlideTime); - panes.writeToOutput(out, keySerializer, stateTypeSerializer); - - taskState.setOperatorState(out.closeAndGetHandle()); - return taskState; + DataOutputViewStreamWrapper outView = new DataOutputViewStreamWrapper(out); + + outView.writeLong(nextEvaluationTime); + outView.writeLong(nextSlideTime); + + panes.writeToOutput(outView, keySerializer, stateTypeSerializer); + + outView.flush(); } @Override - public void restoreState(StreamTaskState taskState) throws Exception { - super.restoreState(taskState); + public void restoreState(FSDataInputStream in) throws Exception { + super.restoreState(in); - @SuppressWarnings("unchecked") - StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState(); - DataInputView in = inputState.getState(getUserCodeClassloader()); - - final long nextEvaluationTime = in.readLong(); - final long nextSlideTime = in.readLong(); + DataInputViewStreamWrapper inView = new DataInputViewStreamWrapper(in); + + final long nextEvaluationTime = inView.readLong(); + final long nextSlideTime = inView.readLong(); AbstractKeyedTimePanes<IN, KEY, STATE, OUT> panes = createPanes(keySelector, function); - panes.readFromInput(in, keySerializer, stateTypeSerializer); - + + panes.readFromInput(inView, keySerializer, stateTypeSerializer); + restoredState = new RestoredState<>(panes, nextEvaluationTime, nextSlideTime); } http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index 12ed60e..dbdd660 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -39,11 +39,13 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.InputTypeConfigurable; import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.api.java.typeutils.runtime.TupleSerializer; +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FSDataOutputStream; import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; import org.apache.flink.metrics.MetricGroup; -import org.apache.flink.runtime.state.AbstractStateBackend; -import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; @@ -59,7 +61,6 @@ import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.streaming.runtime.operators.Triggerable; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalWindowFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.StreamTaskState; import org.apache.flink.util.Preconditions; import java.io.IOException; @@ -857,7 +858,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> @Override @SuppressWarnings("unchecked") - public StreamTaskState snapshotOperatorState(long checkpointId, long timestamp) throws Exception { + public void snapshotState(FSDataOutputStream out, long checkpointId, long timestamp) throws Exception { if (mergingWindowsByKey != null) { TupleSerializer<Tuple2<W, W>> tupleSerializer = new TupleSerializer<>((Class) Tuple2.class, new TypeSerializer[] {windowSerializer, windowSerializer} ); @@ -870,29 +871,16 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> } } - StreamTaskState taskState = super.snapshotOperatorState(checkpointId, timestamp); + snapshotTimers(new DataOutputViewStreamWrapper(out)); - AbstractStateBackend.CheckpointStateOutputView out = - getStateBackend().createCheckpointStateOutputView(checkpointId, timestamp); - - snapshotTimers(out); - - taskState.setOperatorState(out.closeAndGetHandle()); - - return taskState; + super.snapshotState(out, checkpointId, timestamp); } @Override - public void restoreState(StreamTaskState taskState) throws Exception { - super.restoreState(taskState); - - final ClassLoader userClassloader = getUserCodeClassloader(); - - @SuppressWarnings("unchecked") - StateHandle<DataInputView> inputState = (StateHandle<DataInputView>) taskState.getOperatorState(); - DataInputView in = inputState.getState(userClassloader); + public void restoreState(FSDataInputStream in) throws Exception { + restoreTimers(new DataInputViewStreamWrapper(in)); - restoreTimers(in); + super.restoreState(in); } private void restoreTimers(DataInputView in ) throws IOException { http://git-wip-us.apache.org/repos/asf/flink/blob/847ead01/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java index 5bcf41b..108a3ae 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/partitioner/KeyGroupStreamPartitioner.java @@ -21,6 +21,7 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.api.common.state.KeyGroupAssigner; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.runtime.plugable.SerializationDelegate; +import org.apache.flink.runtime.state.KeyGroupRange; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.util.Preconditions; @@ -60,8 +61,10 @@ public class KeyGroupStreamPartitioner<T, K> extends StreamPartitioner<T> implem } catch (Exception e) { throw new RuntimeException("Could not extract key from " + record.getInstance().getValue(), e); } - returnArray[0] = keyGroupAssigner.getKeyGroupIndex(key) % numberOfOutputChannels; - + returnArray[0] = KeyGroupRange.computeOperatorIndexForKeyGroup( + keyGroupAssigner.getNumberKeyGroups(), + numberOfOutputChannels, + keyGroupAssigner.getKeyGroupIndex(key)); return returnArray; }