http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java deleted file mode 100644 index 16f3769..0000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/savepoint/MigrationV0ToV1Test.java +++ /dev/null @@ -1,251 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.checkpoint.savepoint; - -import org.apache.flink.api.common.state.ValueStateDescriptor; -import org.apache.flink.api.common.typeutils.base.IntSerializer; -import org.apache.flink.api.java.tuple.Tuple4; -import org.apache.flink.core.fs.FSDataOutputStream; -import org.apache.flink.core.fs.FileSystem; -import org.apache.flink.core.fs.Path; -import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0; -import org.apache.flink.migration.runtime.checkpoint.savepoint.SavepointV0Serializer; -import org.apache.flink.migration.runtime.state.KvStateSnapshot; -import org.apache.flink.migration.runtime.state.memory.MemValueState; -import org.apache.flink.migration.runtime.state.memory.SerializedStateHandle; -import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskState; -import org.apache.flink.migration.streaming.runtime.tasks.StreamTaskStateList; -import org.apache.flink.migration.util.MigrationInstantiationUtil; -import org.apache.flink.runtime.checkpoint.SubtaskState; -import org.apache.flink.runtime.checkpoint.TaskState; -import org.apache.flink.runtime.jobgraph.JobVertexID; -import org.apache.flink.runtime.state.ChainedStateHandle; -import org.apache.flink.runtime.state.KeyGroupsStateHandle; -import org.apache.flink.runtime.state.KeyedStateHandle; -import org.apache.flink.runtime.state.StreamStateHandle; -import org.apache.flink.runtime.state.VoidNamespaceSerializer; -import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; -import org.apache.flink.util.FileUtils; -import org.apache.flink.util.InstantiationUtil; -import org.junit.Rule; -import org.junit.Test; -import org.junit.rules.TemporaryFolder; - -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.Serializable; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.List; -import java.util.concurrent.ThreadLocalRandom; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -@SuppressWarnings("deprecation") -public class MigrationV0ToV1Test { - - @Rule - public TemporaryFolder tmp = new TemporaryFolder(); - - /** - * Simple test of savepoint methods. - */ - @Test - public void testSavepointMigrationV0ToV1() throws Exception { - - String target = tmp.getRoot().getAbsolutePath(); - - assertEquals(0, tmp.getRoot().listFiles().length); - - long checkpointId = ThreadLocalRandom.current().nextLong(Integer.MAX_VALUE); - int numTaskStates = 4; - int numSubtaskStates = 16; - - Collection<org.apache.flink.migration.runtime.checkpoint.TaskState> expected = - createTaskStatesOld(numTaskStates, numSubtaskStates); - - SavepointV0 savepoint = new SavepointV0(checkpointId, expected); - - assertEquals(SavepointV0.VERSION, savepoint.getVersion()); - assertEquals(checkpointId, savepoint.getCheckpointId()); - assertEquals(expected, savepoint.getOldTaskStates()); - - assertFalse(savepoint.getOldTaskStates().isEmpty()); - - Exception latestException = null; - Path path = null; - FSDataOutputStream fdos = null; - - FileSystem fs = null; - - try { - - // Try to create a FS output stream - for (int attempt = 0; attempt < 10; attempt++) { - path = new Path(target, FileUtils.getRandomFilename("savepoint-")); - - if (fs == null) { - fs = FileSystem.get(path.toUri()); - } - - try { - fdos = fs.create(path, FileSystem.WriteMode.NO_OVERWRITE); - break; - } catch (Exception e) { - latestException = e; - } - } - - if (fdos == null) { - throw new IOException("Failed to create file output stream at " + path, latestException); - } - - try (DataOutputStream dos = new DataOutputStream(fdos)) { - dos.writeInt(SavepointStore.MAGIC_NUMBER); - dos.writeInt(savepoint.getVersion()); - SavepointV0Serializer.INSTANCE.serializeOld(savepoint, dos); - } - - ClassLoader cl = Thread.currentThread().getContextClassLoader(); - - Savepoint sp = SavepointStore.loadSavepoint(path.toString(), cl); - int t = 0; - for (TaskState taskState : sp.getTaskStates()) { - for (int p = 0; p < taskState.getParallelism(); ++p) { - SubtaskState subtaskState = taskState.getState(p); - ChainedStateHandle<StreamStateHandle> legacyOperatorState = subtaskState.getLegacyOperatorState(); - for (int c = 0; c < legacyOperatorState.getLength(); ++c) { - StreamStateHandle stateHandle = legacyOperatorState.get(c); - try (InputStream is = stateHandle.openInputStream()) { - Tuple4<Integer, Integer, Integer, Integer> expTestState = new Tuple4<>(0, t, p, c); - Tuple4<Integer, Integer, Integer, Integer> actTestState; - //check function state - if (p % 4 != 0) { - assertEquals(1, is.read()); - actTestState = InstantiationUtil.deserializeObject(is, cl); - assertEquals(expTestState, actTestState); - } else { - assertEquals(0, is.read()); - } - - //check operator state - expTestState.f0 = 1; - actTestState = InstantiationUtil.deserializeObject(is, cl); - assertEquals(expTestState, actTestState); - } - } - - //check keyed state - KeyedStateHandle keyedStateHandle = subtaskState.getManagedKeyedState(); - - if (t % 3 != 0) { - - assertTrue(keyedStateHandle instanceof KeyGroupsStateHandle); - - KeyGroupsStateHandle keyGroupsStateHandle = (KeyGroupsStateHandle) keyedStateHandle; - - assertEquals(1, keyGroupsStateHandle.getKeyGroupRange().getNumberOfKeyGroups()); - assertEquals(p, keyGroupsStateHandle.getGroupRangeOffsets().getKeyGroupRange().getStartKeyGroup()); - - ByteStreamStateHandle stateHandle = - (ByteStreamStateHandle) keyGroupsStateHandle.getDelegateStateHandle(); - HashMap<String, KvStateSnapshot<?, ?, ?, ?>> testKeyedState = - MigrationInstantiationUtil.deserializeObject(stateHandle.getData(), cl); - - assertEquals(2, testKeyedState.size()); - for (KvStateSnapshot<?, ?, ?, ?> snapshot : testKeyedState.values()) { - MemValueState.Snapshot<?, ?, ?> castedSnapshot = (MemValueState.Snapshot<?, ?, ?>) snapshot; - byte[] data = castedSnapshot.getData(); - assertEquals(t, data[0]); - assertEquals(p, data[1]); - } - } else { - assertEquals(null, keyedStateHandle); - } - } - - ++t; - } - - savepoint.dispose(); - - } finally { - // Dispose - SavepointStore.removeSavepointFile(path.toString()); - } - } - - private static Collection<org.apache.flink.migration.runtime.checkpoint.TaskState> createTaskStatesOld( - int numTaskStates, int numSubtaskStates) throws Exception { - - List<org.apache.flink.migration.runtime.checkpoint.TaskState> taskStates = new ArrayList<>(numTaskStates); - - for (int i = 0; i < numTaskStates; i++) { - org.apache.flink.migration.runtime.checkpoint.TaskState taskState = - new org.apache.flink.migration.runtime.checkpoint.TaskState(new JobVertexID(), numSubtaskStates); - for (int j = 0; j < numSubtaskStates; j++) { - - StreamTaskState[] streamTaskStates = new StreamTaskState[2]; - - for (int k = 0; k < streamTaskStates.length; k++) { - StreamTaskState state = new StreamTaskState(); - Tuple4<Integer, Integer, Integer, Integer> testState = new Tuple4<>(0, i, j, k); - if (j % 4 != 0) { - state.setFunctionState(new SerializedStateHandle<Serializable>(testState)); - } - testState = new Tuple4<>(1, i, j, k); - state.setOperatorState(new SerializedStateHandle<>(testState)); - - if ((0 == k) && (i % 3 != 0)) { - HashMap<String, KvStateSnapshot<?, ?, ?, ?>> testKeyedState = new HashMap<>(2); - for (int l = 0; l < 2; ++l) { - String name = "keyed-" + l; - KvStateSnapshot<?, ?, ?, ?> testKeyedSnapshot = - new MemValueState.Snapshot<>( - IntSerializer.INSTANCE, - VoidNamespaceSerializer.INSTANCE, - IntSerializer.INSTANCE, - new ValueStateDescriptor<>(name, Integer.class, 0), - new byte[]{(byte) i, (byte) j}); - testKeyedState.put(name, testKeyedSnapshot); - } - state.setKvStates(testKeyedState); - } - streamTaskStates[k] = state; - } - - StreamTaskStateList streamTaskStateList = new StreamTaskStateList(streamTaskStates); - org.apache.flink.migration.util.SerializedValue< - org.apache.flink.migration.runtime.state.StateHandle<?>> handle = - new org.apache.flink.migration.util.SerializedValue< - org.apache.flink.migration.runtime.state.StateHandle<?>>(streamTaskStateList); - - taskState.putState(j, new org.apache.flink.migration.runtime.checkpoint.SubtaskState(handle, 0, 0)); - } - - taskStates.add(taskState); - } - - return taskStates; - } -}
http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java index 933c7a0..173730a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerHARecoveryTest.java @@ -61,6 +61,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.messages.JobManagerMessages; import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.state.OperatorStateHandle; import org.apache.flink.runtime.state.memory.ByteStreamStateHandle; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.testingUtils.TestingJobManager; @@ -72,6 +73,7 @@ import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testutils.RecoverableCompletedCheckpointStore; import org.apache.flink.runtime.util.TestByteStreamStateHandleDeepCompare; import org.apache.flink.util.InstantiationUtil; +import org.apache.flink.util.Preconditions; import org.apache.flink.util.TestLogger; import akka.actor.ActorRef; @@ -98,6 +100,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; @@ -560,7 +563,8 @@ public class JobManagerHARecoveryTest extends TestLogger { TaskStateSnapshot taskStateHandles) throws Exception { int subtaskIndex = getIndexInSubtaskGroup(); if (subtaskIndex < recoveredStates.length) { - try (FSDataInputStream in = taskStateHandles.getSubtaskStateMappings().iterator().next().getValue().getLegacyOperatorState().openInputStream()) { + OperatorStateHandle operatorStateHandle = extractSingletonOperatorState(taskStateHandles); + try (FSDataInputStream in = operatorStateHandle.openInputStream()) { recoveredStates[subtaskIndex] = InstantiationUtil.deserializeObject(in, getUserCodeClassLoader()); } } @@ -572,11 +576,21 @@ public class JobManagerHARecoveryTest extends TestLogger { String.valueOf(UUID.randomUUID()), InstantiationUtil.serializeObject(checkpointMetaData.getCheckpointId())); + Map<String, OperatorStateHandle.StateMetaInfo> stateNameToPartitionOffsets = new HashMap<>(1); + stateNameToPartitionOffsets.put( + "test-state", + new OperatorStateHandle.StateMetaInfo(new long[]{0L}, OperatorStateHandle.Mode.SPLIT_DISTRIBUTE)); + + OperatorStateHandle operatorStateHandle = new OperatorStateHandle(stateNameToPartitionOffsets, byteStreamStateHandle); + TaskStateSnapshot checkpointStateHandles = new TaskStateSnapshot(); checkpointStateHandles.putSubtaskStateByOperatorID( OperatorID.fromJobVertexID(getEnvironment().getJobVertexId()), - new OperatorSubtaskState(byteStreamStateHandle) - ); + new OperatorSubtaskState( + Collections.singletonList(operatorStateHandle), + Collections.emptyList(), + Collections.emptyList(), + Collections.emptyList())); getEnvironment().acknowledgeCheckpoint( checkpointMetaData.getCheckpointId(), @@ -614,5 +628,17 @@ public class JobManagerHARecoveryTest extends TestLogger { public static long[] getRecoveredStates() { return recoveredStates; } + + private static OperatorStateHandle extractSingletonOperatorState(TaskStateSnapshot taskStateHandles) { + Set<Map.Entry<OperatorID, OperatorSubtaskState>> subtaskStateMappings = taskStateHandles.getSubtaskStateMappings(); + Preconditions.checkNotNull(subtaskStateMappings); + Preconditions.checkState(subtaskStateMappings.size() == 1); + OperatorSubtaskState subtaskState = subtaskStateMappings.iterator().next().getValue(); + Collection<OperatorStateHandle> managedOperatorState = + Preconditions.checkNotNull(subtaskState).getManagedOperatorState(); + Preconditions.checkNotNull(managedOperatorState); + Preconditions.checkState(managedOperatorState.size() == 1); + return managedOperatorState.iterator().next(); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java index d022cdc..b36ac86 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java @@ -75,7 +75,6 @@ public class CheckpointMessagesTest { checkpointStateHandles.putSubtaskStateByOperatorID( new OperatorID(), new OperatorSubtaskState( - CheckpointCoordinatorTest.generateStreamStateHandle(new MyHandle()), CheckpointCoordinatorTest.generatePartitionableStateHandle(new JobVertexID(), 0, 2, 8, false), null, CheckpointCoordinatorTest.generateKeyGroupState(keyGroupRange, Collections.singletonList(new MyHandle())), http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java deleted file mode 100644 index dd6148c..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/api/graph/StreamGraphHasherV1.java +++ /dev/null @@ -1,282 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.migration.streaming.api.graph; - -import org.apache.flink.streaming.api.graph.StreamEdge; -import org.apache.flink.streaming.api.graph.StreamGraph; -import org.apache.flink.streaming.api.graph.StreamGraphHasher; -import org.apache.flink.streaming.api.graph.StreamNode; -import org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator; -import org.apache.flink.streaming.api.operators.ChainingStrategy; -import org.apache.flink.streaming.api.operators.StreamOperator; -import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner; - -import org.apache.flink.shaded.guava18.com.google.common.hash.HashFunction; -import org.apache.flink.shaded.guava18.com.google.common.hash.Hasher; -import org.apache.flink.shaded.guava18.com.google.common.hash.Hashing; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.nio.charset.Charset; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Queue; -import java.util.Set; - -import static org.apache.flink.util.StringUtils.byteToHexString; - -/** - * StreamGraphHasher from Flink 1.1. This contains duplicated code to ensure that the algorithm does not change with - * future Flink versions. - * - * <p>DO NOT MODIFY THIS CLASS - */ -public class StreamGraphHasherV1 implements StreamGraphHasher { - - private static final Logger LOG = LoggerFactory.getLogger(StreamGraphHasherV1.class); - - @Override - public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) { - // The hash function used to generate the hash - final HashFunction hashFunction = Hashing.murmur3_128(0); - final Map<Integer, byte[]> hashes = new HashMap<>(); - - Set<Integer> visited = new HashSet<>(); - Queue<StreamNode> remaining = new ArrayDeque<>(); - - // We need to make the source order deterministic. The source IDs are - // not returned in the same order, which means that submitting the same - // program twice might result in different traversal, which breaks the - // deterministic hash assignment. - List<Integer> sources = new ArrayList<>(); - for (Integer sourceNodeId : streamGraph.getSourceIDs()) { - sources.add(sourceNodeId); - } - Collections.sort(sources); - - // - // Traverse the graph in a breadth-first manner. Keep in mind that - // the graph is not a tree and multiple paths to nodes can exist. - // - - // Start with source nodes - for (Integer sourceNodeId : sources) { - remaining.add(streamGraph.getStreamNode(sourceNodeId)); - visited.add(sourceNodeId); - } - - StreamNode currentNode; - while ((currentNode = remaining.poll()) != null) { - // Generate the hash code. Because multiple path exist to each - // node, we might not have all required inputs available to - // generate the hash code. - if (generateNodeHash(currentNode, hashFunction, hashes, streamGraph.isChainingEnabled())) { - // Add the child nodes - for (StreamEdge outEdge : currentNode.getOutEdges()) { - StreamNode child = outEdge.getTargetVertex(); - - if (!visited.contains(child.getId())) { - remaining.add(child); - visited.add(child.getId()); - } - } - } else { - // We will revisit this later. - visited.remove(currentNode.getId()); - } - } - - return hashes; - } - - /** - * Generates a hash for the node and returns whether the operation was - * successful. - * - * @param node The node to generate the hash for - * @param hashFunction The hash function to use - * @param hashes The current state of generated hashes - * @return <code>true</code> if the node hash has been generated. - * <code>false</code>, otherwise. If the operation is not successful, the - * hash needs be generated at a later point when all input is available. - * @throws IllegalStateException If node has user-specified hash and is - * intermediate node of a chain - */ - private boolean generateNodeHash( - StreamNode node, - HashFunction hashFunction, - Map<Integer, byte[]> hashes, - boolean isChainingEnabled) { - - // Check for user-specified ID - String userSpecifiedHash = node.getTransformationUID(); - - if (userSpecifiedHash == null) { - // Check that all input nodes have their hashes computed - for (StreamEdge inEdge : node.getInEdges()) { - // If the input node has not been visited yet, the current - // node will be visited again at a later point when all input - // nodes have been visited and their hashes set. - if (!hashes.containsKey(inEdge.getSourceId())) { - return false; - } - } - - Hasher hasher = hashFunction.newHasher(); - byte[] hash = generateDeterministicHash(node, hasher, hashes, isChainingEnabled); - - if (hashes.put(node.getId(), hash) != null) { - // Sanity check - throw new IllegalStateException("Unexpected state. Tried to add node hash " + - "twice. This is probably a bug in the JobGraph generator."); - } - - return true; - } else { - Hasher hasher = hashFunction.newHasher(); - byte[] hash = generateUserSpecifiedHash(node, hasher); - - for (byte[] previousHash : hashes.values()) { - if (Arrays.equals(previousHash, hash)) { - throw new IllegalArgumentException("Hash collision on user-specified ID. " + - "Most likely cause is a non-unique ID. Please check that all IDs " + - "specified via `uid(String)` are unique."); - } - } - - if (hashes.put(node.getId(), hash) != null) { - // Sanity check - throw new IllegalStateException("Unexpected state. Tried to add node hash " + - "twice. This is probably a bug in the JobGraph generator."); - } - - return true; - } - } - - /** - * Generates a hash from a user-specified ID. - */ - private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) { - hasher.putString(node.getTransformationUID(), Charset.forName("UTF-8")); - - return hasher.hash().asBytes(); - } - - /** - * Generates a deterministic hash from node-local properties and input and - * output edges. - */ - private byte[] generateDeterministicHash( - StreamNode node, - Hasher hasher, - Map<Integer, byte[]> hashes, - boolean isChainingEnabled) { - - // Include stream node to hash. We use the current size of the computed - // hashes as the ID. We cannot use the node's ID, because it is - // assigned from a static counter. This will result in two identical - // programs having different hashes. - generateNodeLocalHash(node, hasher, hashes.size()); - - // Include chained nodes to hash - for (StreamEdge outEdge : node.getOutEdges()) { - if (isChainable(outEdge, isChainingEnabled)) { - StreamNode chainedNode = outEdge.getTargetVertex(); - - // Use the hash size again, because the nodes are chained to - // this node. This does not add a hash for the chained nodes. - generateNodeLocalHash(chainedNode, hasher, hashes.size()); - } - } - - byte[] hash = hasher.hash().asBytes(); - - // Make sure that all input nodes have their hash set before entering - // this loop (calling this method). - for (StreamEdge inEdge : node.getInEdges()) { - byte[] otherHash = hashes.get(inEdge.getSourceId()); - - // Sanity check - if (otherHash == null) { - throw new IllegalStateException("Missing hash for input node " - + inEdge.getSourceVertex() + ". Cannot generate hash for " - + node + "."); - } - - for (int j = 0; j < hash.length; j++) { - hash[j] = (byte) (hash[j] * 37 ^ otherHash[j]); - } - } - - if (LOG.isDebugEnabled()) { - String udfClassName = ""; - if (node.getOperator() instanceof AbstractUdfStreamOperator) { - udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator()) - .getUserFunction().getClass().getName(); - } - - LOG.debug("Generated hash '" + byteToHexString(hash) + "' for node " + - "'" + node.toString() + "' {id: " + node.getId() + ", " + - "parallelism: " + node.getParallelism() + ", " + - "user function: " + udfClassName + "}"); - } - - return hash; - } - - private boolean isChainable(StreamEdge edge, boolean isChainingEnabled) { - StreamNode upStreamVertex = edge.getSourceVertex(); - StreamNode downStreamVertex = edge.getTargetVertex(); - - StreamOperator<?> headOperator = upStreamVertex.getOperator(); - StreamOperator<?> outOperator = downStreamVertex.getOperator(); - - return downStreamVertex.getInEdges().size() == 1 - && outOperator != null - && headOperator != null - && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) - && outOperator.getChainingStrategy() == ChainingStrategy.ALWAYS - && (headOperator.getChainingStrategy() == ChainingStrategy.HEAD || - headOperator.getChainingStrategy() == ChainingStrategy.ALWAYS) - && (edge.getPartitioner() instanceof ForwardPartitioner) - && upStreamVertex.getParallelism() == downStreamVertex.getParallelism() - && isChainingEnabled; - } - - private void generateNodeLocalHash(StreamNode node, Hasher hasher, int id) { - hasher.putInt(id); - - hasher.putInt(node.getParallelism()); - - if (node.getOperator() instanceof AbstractUdfStreamOperator) { - String udfClassName = ((AbstractUdfStreamOperator<?, ?>) node.getOperator()) - .getUserFunction().getClass().getName(); - - hasher.putString(udfClassName, Charset.forName("UTF-8")); - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java deleted file mode 100644 index b1471b2..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/MultiplexingStreamRecordSerializer.java +++ /dev/null @@ -1,293 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.migration.streaming.runtime.streamrecord; - -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; -import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.StreamElement; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; - -import java.io.IOException; - -import static java.util.Objects.requireNonNull; - -/** - * Legacy multiplexing {@link TypeSerializer} for stream records, watermarks and other stream - * elements. - */ -public class MultiplexingStreamRecordSerializer<T> extends TypeSerializer<StreamElement> { - - - private static final long serialVersionUID = 1L; - - private static final int TAG_REC_WITH_TIMESTAMP = 0; - private static final int TAG_REC_WITHOUT_TIMESTAMP = 1; - private static final int TAG_WATERMARK = 2; - - - private final TypeSerializer<T> typeSerializer; - - public MultiplexingStreamRecordSerializer(TypeSerializer<T> serializer) { - if (serializer instanceof MultiplexingStreamRecordSerializer || serializer instanceof StreamRecordSerializer) { - throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer); - } - this.typeSerializer = requireNonNull(serializer); - } - - public TypeSerializer<T> getContainedTypeSerializer() { - return this.typeSerializer; - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public MultiplexingStreamRecordSerializer<T> duplicate() { - TypeSerializer<T> copy = typeSerializer.duplicate(); - return (copy == typeSerializer) ? this : new MultiplexingStreamRecordSerializer<T>(copy); - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - @Override - public StreamRecord<T> createInstance() { - return new StreamRecord<T>(typeSerializer.createInstance()); - } - - @Override - public int getLength() { - return -1; - } - - @Override - public StreamElement copy(StreamElement from) { - // we can reuse the timestamp since Instant is immutable - if (from.isRecord()) { - StreamRecord<T> fromRecord = from.asRecord(); - return fromRecord.copy(typeSerializer.copy(fromRecord.getValue())); - } - else if (from.isWatermark()) { - // is immutable - return from; - } - else { - throw new RuntimeException(); - } - } - - @Override - public StreamElement copy(StreamElement from, StreamElement reuse) { - if (from.isRecord() && reuse.isRecord()) { - StreamRecord<T> fromRecord = from.asRecord(); - StreamRecord<T> reuseRecord = reuse.asRecord(); - - T valueCopy = typeSerializer.copy(fromRecord.getValue(), reuseRecord.getValue()); - fromRecord.copyTo(valueCopy, reuseRecord); - return reuse; - } - else if (from.isWatermark()) { - // is immutable - return from; - } - else { - throw new RuntimeException("Cannot copy " + from + " -> " + reuse); - } - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - int tag = source.readByte(); - target.write(tag); - - if (tag == TAG_REC_WITH_TIMESTAMP) { - // move timestamp - target.writeLong(source.readLong()); - typeSerializer.copy(source, target); - } - else if (tag == TAG_REC_WITHOUT_TIMESTAMP) { - typeSerializer.copy(source, target); - } - else if (tag == TAG_WATERMARK) { - target.writeLong(source.readLong()); - } - else { - throw new IOException("Corrupt stream, found tag: " + tag); - } - } - - @Override - public void serialize(StreamElement value, DataOutputView target) throws IOException { - if (value.isRecord()) { - StreamRecord<T> record = value.asRecord(); - - if (record.hasTimestamp()) { - target.write(TAG_REC_WITH_TIMESTAMP); - target.writeLong(record.getTimestamp()); - } else { - target.write(TAG_REC_WITHOUT_TIMESTAMP); - } - typeSerializer.serialize(record.getValue(), target); - } - else if (value.isWatermark()) { - target.write(TAG_WATERMARK); - target.writeLong(value.asWatermark().getTimestamp()); - } - else { - throw new RuntimeException(); - } - } - - @Override - public StreamElement deserialize(DataInputView source) throws IOException { - int tag = source.readByte(); - if (tag == TAG_REC_WITH_TIMESTAMP) { - long timestamp = source.readLong(); - return new StreamRecord<T>(typeSerializer.deserialize(source), timestamp); - } - else if (tag == TAG_REC_WITHOUT_TIMESTAMP) { - return new StreamRecord<T>(typeSerializer.deserialize(source)); - } - else if (tag == TAG_WATERMARK) { - return new Watermark(source.readLong()); - } - else { - throw new IOException("Corrupt stream, found tag: " + tag); - } - } - - @Override - public StreamElement deserialize(StreamElement reuse, DataInputView source) throws IOException { - int tag = source.readByte(); - if (tag == TAG_REC_WITH_TIMESTAMP) { - long timestamp = source.readLong(); - T value = typeSerializer.deserialize(source); - StreamRecord<T> reuseRecord = reuse.asRecord(); - reuseRecord.replace(value, timestamp); - return reuseRecord; - } - else if (tag == TAG_REC_WITHOUT_TIMESTAMP) { - T value = typeSerializer.deserialize(source); - StreamRecord<T> reuseRecord = reuse.asRecord(); - reuseRecord.replace(value); - return reuseRecord; - } - else if (tag == TAG_WATERMARK) { - return new Watermark(source.readLong()); - } - else { - throw new IOException("Corrupt stream, found tag: " + tag); - } - } - - // -------------------------------------------------------------------------------------------- - // Serializer configuration snapshotting & compatibility - // -------------------------------------------------------------------------------------------- - - @Override - public MultiplexingStreamRecordSerializerConfigSnapshot snapshotConfiguration() { - return new MultiplexingStreamRecordSerializerConfigSnapshot<>(typeSerializer); - } - - @Override - public CompatibilityResult<StreamElement> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof MultiplexingStreamRecordSerializerConfigSnapshot) { - Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousTypeSerializerAndConfig = - ((MultiplexingStreamRecordSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig(); - - CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult( - previousTypeSerializerAndConfig.f0, - UnloadableDummyTypeSerializer.class, - previousTypeSerializerAndConfig.f1, - typeSerializer); - - if (!compatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else if (compatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new MultiplexingStreamRecordSerializer<>( - new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()))); - } - } - - return CompatibilityResult.requiresMigration(); - } - - /** - * Configuration snapshot specific to the {@link MultiplexingStreamRecordSerializer}. - */ - public static final class MultiplexingStreamRecordSerializerConfigSnapshot<T> - extends CompositeTypeSerializerConfigSnapshot { - - private static final int VERSION = 1; - - /** This empty nullary constructor is required for deserializing the configuration. */ - public MultiplexingStreamRecordSerializerConfigSnapshot() {} - - public MultiplexingStreamRecordSerializerConfigSnapshot(TypeSerializer<T> typeSerializer) { - super(typeSerializer); - } - - @Override - public int getVersion() { - return VERSION; - } - } - - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - @Override - public boolean equals(Object obj) { - if (obj instanceof MultiplexingStreamRecordSerializer) { - MultiplexingStreamRecordSerializer<?> other = (MultiplexingStreamRecordSerializer<?>) obj; - - return other.canEqual(this) && typeSerializer.equals(other.typeSerializer); - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof MultiplexingStreamRecordSerializer; - } - - @Override - public int hashCode() { - return typeSerializer.hashCode(); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java b/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java deleted file mode 100644 index e018ba0..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/migration/streaming/runtime/streamrecord/StreamRecordSerializer.java +++ /dev/null @@ -1,208 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUStreamRecord<?>WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.migration.streaming.runtime.streamrecord; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.typeutils.CompatibilityResult; -import org.apache.flink.api.common.typeutils.CompatibilityUtil; -import org.apache.flink.api.common.typeutils.CompositeTypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.TypeDeserializerAdapter; -import org.apache.flink.api.common.typeutils.TypeSerializer; -import org.apache.flink.api.common.typeutils.TypeSerializerConfigSnapshot; -import org.apache.flink.api.common.typeutils.UnloadableDummyTypeSerializer; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.util.Preconditions; - -import java.io.IOException; - -/** - * Serializer for {@link StreamRecord}. This version ignores timestamps and only deals with - * the element. - * - * <p>{@link MultiplexingStreamRecordSerializer} is a version that deals with timestamps and also - * multiplexes {@link org.apache.flink.streaming.api.watermark.Watermark Watermarks} in the same - * stream with {@link StreamRecord StreamRecords}. - * - * @see MultiplexingStreamRecordSerializer - * - * @param <T> The type of value in the {@link StreamRecord} - */ -@Internal -public final class StreamRecordSerializer<T> extends TypeSerializer<StreamRecord<T>> { - - private static final long serialVersionUID = 1L; - - private final TypeSerializer<T> typeSerializer; - - public StreamRecordSerializer(TypeSerializer<T> serializer) { - if (serializer instanceof StreamRecordSerializer) { - throw new RuntimeException("StreamRecordSerializer given to StreamRecordSerializer as value TypeSerializer: " + serializer); - } - this.typeSerializer = Preconditions.checkNotNull(serializer); - } - - public TypeSerializer<T> getContainedTypeSerializer() { - return this.typeSerializer; - } - - // ------------------------------------------------------------------------ - // General serializer and type utils - // ------------------------------------------------------------------------ - - @Override - public StreamRecordSerializer<T> duplicate() { - TypeSerializer<T> serializerCopy = typeSerializer.duplicate(); - return serializerCopy == typeSerializer ? this : new StreamRecordSerializer<T>(serializerCopy); - } - - @Override - public boolean isImmutableType() { - return false; - } - - @Override - public int getLength() { - return typeSerializer.getLength(); - } - - // ------------------------------------------------------------------------ - // Type serialization, copying, instantiation - // ------------------------------------------------------------------------ - - @Override - public StreamRecord<T> createInstance() { - try { - return new StreamRecord<T>(typeSerializer.createInstance()); - } catch (Exception e) { - throw new RuntimeException("Cannot instantiate StreamRecord.", e); - } - } - - @Override - public StreamRecord<T> copy(StreamRecord<T> from) { - return from.copy(typeSerializer.copy(from.getValue())); - } - - @Override - public StreamRecord<T> copy(StreamRecord<T> from, StreamRecord<T> reuse) { - from.copyTo(typeSerializer.copy(from.getValue(), reuse.getValue()), reuse); - return reuse; - } - - @Override - public void serialize(StreamRecord<T> value, DataOutputView target) throws IOException { - typeSerializer.serialize(value.getValue(), target); - } - - @Override - public StreamRecord<T> deserialize(DataInputView source) throws IOException { - return new StreamRecord<T>(typeSerializer.deserialize(source)); - } - - @Override - public StreamRecord<T> deserialize(StreamRecord<T> reuse, DataInputView source) throws IOException { - T element = typeSerializer.deserialize(reuse.getValue(), source); - reuse.replace(element); - return reuse; - } - - @Override - public void copy(DataInputView source, DataOutputView target) throws IOException { - typeSerializer.copy(source, target); - } - - // ------------------------------------------------------------------------ - - @Override - public boolean equals(Object obj) { - if (obj instanceof StreamRecordSerializer) { - StreamRecordSerializer<?> other = (StreamRecordSerializer<?>) obj; - - return other.canEqual(this) && typeSerializer.equals(other.typeSerializer); - } else { - return false; - } - } - - @Override - public boolean canEqual(Object obj) { - return obj instanceof StreamRecordSerializer; - } - - @Override - public int hashCode() { - return typeSerializer.hashCode(); - } - - // -------------------------------------------------------------------------------------------- - // Serializer configuration snapshotting & compatibility - // -------------------------------------------------------------------------------------------- - - @Override - public StreamRecordSerializerConfigSnapshot snapshotConfiguration() { - return new StreamRecordSerializerConfigSnapshot<>(typeSerializer); - } - - @Override - public CompatibilityResult<StreamRecord<T>> ensureCompatibility(TypeSerializerConfigSnapshot configSnapshot) { - if (configSnapshot instanceof StreamRecordSerializerConfigSnapshot) { - Tuple2<TypeSerializer<?>, TypeSerializerConfigSnapshot> previousTypeSerializerAndConfig = - ((StreamRecordSerializerConfigSnapshot) configSnapshot).getSingleNestedSerializerAndConfig(); - - CompatibilityResult<T> compatResult = CompatibilityUtil.resolveCompatibilityResult( - previousTypeSerializerAndConfig.f0, - UnloadableDummyTypeSerializer.class, - previousTypeSerializerAndConfig.f1, - typeSerializer); - - if (!compatResult.isRequiresMigration()) { - return CompatibilityResult.compatible(); - } else if (compatResult.getConvertDeserializer() != null) { - return CompatibilityResult.requiresMigration( - new StreamRecordSerializer<>( - new TypeDeserializerAdapter<>(compatResult.getConvertDeserializer()))); - } - } - - return CompatibilityResult.requiresMigration(); - } - - /** - * Configuration snapshot specific to the {@link StreamRecordSerializer}. - */ - public static final class StreamRecordSerializerConfigSnapshot<T> extends CompositeTypeSerializerConfigSnapshot { - - private static final int VERSION = 1; - - /** This empty nullary constructor is required for deserializing the configuration. */ - public StreamRecordSerializerConfigSnapshot() {} - - public StreamRecordSerializerConfigSnapshot(TypeSerializer<T> typeSerializer) { - super(typeSerializer); - } - - @Override - public int getVersion() { - return VERSION; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java deleted file mode 100644 index cb3c7cc..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/Checkpointed.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.checkpoint; - -import org.apache.flink.annotation.PublicEvolving; - -import java.io.Serializable; - -/** - * This method must be implemented by functions that have state that needs to be - * checkpointed. The functions get a call whenever a checkpoint should take place - * and return a snapshot of their state, which will be checkpointed. - * - * <h1>Deprecation and Replacement</h1> - * The short cut replacement for this interface is via {@link ListCheckpointed} and works - * as shown in the example below. The {@code ListCheckpointed} interface returns a list of - * elements ( - * - * <p><pre>{@code - * public class ExampleFunction<T> implements MapFunction<T, T>, ListCheckpointed<Integer> { - * - * private int count; - * - * public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { - * return Collections.singletonList(this.count); - * } - * - * public void restoreState(List<Integer> state) throws Exception { - * this.value = state.isEmpty() ? 0 : state.get(0); - * } - * - * public T map(T value) { - * count++; - * return value; - * } - * } - * }</pre> - * - * @param <T> The type of the operator state. - * - * @deprecated Please use {@link ListCheckpointed} as illustrated above, or - * {@link CheckpointedFunction} for more control over the checkpointing process. - */ -@Deprecated -@PublicEvolving -public interface Checkpointed<T extends Serializable> extends CheckpointedRestoring<T> { - - /** - * Gets the current state of the function of operator. The state must reflect the result of all - * prior invocations to this function. - * - * @param checkpointId The ID of the checkpoint. - * @param checkpointTimestamp The timestamp of the checkpoint, as derived by - * System.currentTimeMillis() on the JobManager. - * - * @return A snapshot of the operator state. - * - * @throws Exception Thrown if the creation of the state object failed. This causes the - * checkpoint to fail. The system may decide to fail the operation (and trigger - * recovery), or to discard this checkpoint attempt and to continue running - * and to try again with the next checkpoint attempt. - */ - T snapshotState(long checkpointId, long checkpointTimestamp) throws Exception; -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java deleted file mode 100644 index 5138b49..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedAsynchronously.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.checkpoint; - -import org.apache.flink.annotation.PublicEvolving; - -import java.io.Serializable; - -/** - * This interface marks a function/operator as checkpointed similar to the - * {@link Checkpointed} interface, but gives the Flink framework the option to - * perform the checkpoint asynchronously. Note that asynchronous checkpointing for - * this interface has not been implemented. - * - * <h1>Deprecation and Replacement</h1> - * The shortcut replacement for this interface is via {@link ListCheckpointed} and works - * as shown in the example below. Please refer to the JavaDocs of {@link ListCheckpointed} for - * a more detailed description of how to use the new interface. - * - * <p><pre>{@code - * public class ExampleFunction<T> implements MapFunction<T, T>, ListCheckpointed<Integer> { - * - * private int count; - * - * public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception { - * return Collections.singletonList(this.count); - * } - * - * public void restoreState(List<Integer> state) throws Exception { - * this.value = state.isEmpty() ? 0 : state.get(0); - * } - * - * public T map(T value) { - * count++; - * return value; - * } - * } - * }</pre> - * - * @deprecated Please use {@link ListCheckpointed} and {@link CheckpointedFunction} instead, - * as illustrated in the example above. - */ -@Deprecated -@PublicEvolving -public interface CheckpointedAsynchronously<T extends Serializable> extends Checkpointed<T> {} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedRestoring.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedRestoring.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedRestoring.java deleted file mode 100644 index cfaa505..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointedRestoring.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.checkpoint; - -import org.apache.flink.annotation.PublicEvolving; - -import java.io.Serializable; - -/** - * This deprecated interface contains the methods for restoring from the legacy checkpointing mechanism of state. - * @param <T> type of the restored state. - * - * @deprecated Please use {@link CheckpointedFunction} or {@link ListCheckpointed} after restoring your legacy state. - */ -@Deprecated -@PublicEvolving -public interface CheckpointedRestoring<T extends Serializable> { - /** - * Restores the state of the function or operator to that of a previous checkpoint. - * This method is invoked when a function is executed as part of a recovery run. - * - * <p>Note that restoreState() is called before open(). - * - * @param state The state to be restored. - */ - void restoreState(T state) throws Exception; -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java deleted file mode 100644 index bb6e4bc..0000000 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/LegacyWindowOperatorType.java +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.datastream; - -/** - * For specifying what type of window operator was used to create the state - * that a {@link org.apache.flink.streaming.runtime.operators.windowing.WindowOperator} - * is restoring from. This is used to signal that state written using an aligned processing-time - * window operator should be restored. - */ -public enum LegacyWindowOperatorType { - - FAST_ACCUMULATING(true, false), - - FAST_AGGREGATING(false, true), - - NONE(false, false); - - // ------------------------------------------------------------------------ - - private final boolean fastAccumulating; - private final boolean fastAggregating; - - LegacyWindowOperatorType(boolean fastAccumulating, boolean fastAggregating) { - this.fastAccumulating = fastAccumulating; - this.fastAggregating = fastAggregating; - } - - public boolean isFastAccumulating() { - return fastAccumulating; - } - - public boolean isFastAggregating() { - return fastAggregating; - } - - @Override - public String toString() { - if (fastAccumulating) { - return "AccumulatingProcessingTimeWindowOperator"; - } else if (fastAggregating) { - return "AggregatingProcessingTimeWindowOperator"; - } else { - return "WindowOperator"; - } - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java index 348861f..f904a10 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/WindowedStream.java @@ -23,7 +23,6 @@ import org.apache.flink.annotation.Public; import org.apache.flink.annotation.PublicEvolving; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink.api.common.functions.FoldFunction; -import org.apache.flink.api.common.functions.Function; import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.functions.RichFunction; import org.apache.flink.api.common.state.AggregatingStateDescriptor; @@ -50,19 +49,11 @@ import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.windowing.assigners.BaseAlignedWindowAssigner; import org.apache.flink.streaming.api.windowing.assigners.MergingWindowAssigner; -import org.apache.flink.streaming.api.windowing.assigners.SlidingAlignedProcessingTimeWindows; -import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows; -import org.apache.flink.streaming.api.windowing.assigners.TumblingAlignedProcessingTimeWindows; -import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.streaming.api.windowing.evictors.Evictor; import org.apache.flink.streaming.api.windowing.time.Time; -import org.apache.flink.streaming.api.windowing.triggers.ProcessingTimeTrigger; import org.apache.flink.streaming.api.windowing.triggers.Trigger; -import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; -import org.apache.flink.streaming.runtime.operators.windowing.AccumulatingProcessingTimeWindowOperator; -import org.apache.flink.streaming.runtime.operators.windowing.AggregatingProcessingTimeWindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.WindowOperator; import org.apache.flink.streaming.runtime.operators.windowing.functions.InternalAggregateProcessWindowFunction; @@ -227,33 +218,7 @@ public class WindowedStream<T, K, W extends Window> { //clean the closure function = input.getExecutionEnvironment().clean(function); - - String callLocation = Utils.getCallLocationName(); - String udfName = "WindowedStream." + callLocation; - - SingleOutputStreamOperator<T> result = createFastTimeOperatorIfValid(function, input.getType(), udfName); - if (result != null) { - return result; - } - - LegacyWindowOperatorType legacyOpType = getLegacyWindowType(function); - return reduce(function, new PassThroughWindowFunction<K, W, T>(), legacyOpType); - } - - /** - * Applies the given window function to each window. The window function is called for each - * evaluation of the window for each key individually. The output of the window function is - * interpreted as a regular non-windowed stream. - * - * <p>Arriving data is incrementally aggregated using the given reducer. - * - * @param reduceFunction The reduce function that is used for incremental aggregation. - * @param function The window function. - * @return The data stream that is the result of applying the window function to the window. - */ - @PublicEvolving - public <R> SingleOutputStreamOperator<R> reduce(ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function) { - return reduce(reduceFunction, function, LegacyWindowOperatorType.NONE); + return reduce(function, new PassThroughWindowFunction<K, W, T>()); } /** @@ -265,39 +230,15 @@ public class WindowedStream<T, K, W extends Window> { * * @param reduceFunction The reduce function that is used for incremental aggregation. * @param function The window function. - * @param resultType Type information for the result type of the window function * @return The data stream that is the result of applying the window function to the window. */ - @PublicEvolving public <R> SingleOutputStreamOperator<R> reduce( - ReduceFunction<T> reduceFunction, - WindowFunction<T, R, K, W> function, - TypeInformation<R> resultType) { - return reduce(reduceFunction, function, resultType, LegacyWindowOperatorType.NONE); - } - - /** - * Applies the given window function to each window. The window function is called for each - * evaluation of the window for each key individually. The output of the window function is - * interpreted as a regular non-windowed stream. - * - * <p>Arriving data is incrementally aggregated using the given reducer. - * - * @param reduceFunction The reduce function that is used for incremental aggregation. - * @param function The window function. - * @param legacyWindowOpType When migrating from an older Flink version, this flag indicates - * the type of the previous operator whose state we inherit. - * @return The data stream that is the result of applying the window function to the window. - */ - private <R> SingleOutputStreamOperator<R> reduce( ReduceFunction<T> reduceFunction, - WindowFunction<T, R, K, W> function, - LegacyWindowOperatorType legacyWindowOpType) { + WindowFunction<T, R, K, W> function) { TypeInformation<T> inType = input.getType(); TypeInformation<R> resultType = getWindowFunctionReturnType(function, inType); - - return reduce(reduceFunction, function, resultType, legacyWindowOpType); + return reduce(reduceFunction, function, resultType); } /** @@ -310,15 +251,12 @@ public class WindowedStream<T, K, W extends Window> { * @param reduceFunction The reduce function that is used for incremental aggregation. * @param function The window function. * @param resultType Type information for the result type of the window function. - * @param legacyWindowOpType When migrating from an older Flink version, this flag indicates - * the type of the previous operator whose state we inherit. * @return The data stream that is the result of applying the window function to the window. */ - private <R> SingleOutputStreamOperator<R> reduce( + public <R> SingleOutputStreamOperator<R> reduce( ReduceFunction<T> reduceFunction, WindowFunction<T, R, K, W> function, - TypeInformation<R> resultType, - LegacyWindowOperatorType legacyWindowOpType) { + TypeInformation<R> resultType) { if (reduceFunction instanceof RichFunction) { throw new UnsupportedOperationException("ReduceFunction of reduce can not be a RichFunction."); @@ -374,8 +312,7 @@ public class WindowedStream<T, K, W extends Window> { new InternalSingleValueWindowFunction<>(function), trigger, allowedLateness, - lateDataOutputTag, - legacyWindowOpType); + lateDataOutputTag); } return input.transform(opName, resultType, operator); @@ -1183,12 +1120,6 @@ public class WindowedStream<T, K, W extends Window> { String udfName = "WindowedStream." + callLocation; - SingleOutputStreamOperator<R> result = createFastTimeOperatorIfValid(function, resultType, udfName); - if (result != null) { - return result; - } - - LegacyWindowOperatorType legacyWindowOpType = getLegacyWindowType(function); String opName; KeySelector<T, K> keySel = input.getKeySelector(); @@ -1231,8 +1162,7 @@ public class WindowedStream<T, K, W extends Window> { function, trigger, allowedLateness, - lateDataOutputTag, - legacyWindowOpType); + lateDataOutputTag); } return input.transform(opName, resultType, operator); @@ -1629,119 +1559,6 @@ public class WindowedStream<T, K, W extends Window> { return reduce(aggregator); } - // ------------------------------------------------------------------------ - // Utilities - // ------------------------------------------------------------------------ - - private LegacyWindowOperatorType getLegacyWindowType(Function function) { - if (windowAssigner instanceof SlidingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) { - if (function instanceof ReduceFunction) { - return LegacyWindowOperatorType.FAST_AGGREGATING; - } else if (function instanceof WindowFunction) { - return LegacyWindowOperatorType.FAST_ACCUMULATING; - } - } else if (windowAssigner instanceof TumblingProcessingTimeWindows && trigger instanceof ProcessingTimeTrigger && evictor == null) { - if (function instanceof ReduceFunction) { - return LegacyWindowOperatorType.FAST_AGGREGATING; - } else if (function instanceof WindowFunction) { - return LegacyWindowOperatorType.FAST_ACCUMULATING; - } - } - return LegacyWindowOperatorType.NONE; - } - - private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid( - ReduceFunction<?> function, - TypeInformation<R> resultType, - String functionName) { - - if (windowAssigner.getClass() == SlidingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) { - SlidingAlignedProcessingTimeWindows timeWindows = (SlidingAlignedProcessingTimeWindows) windowAssigner; - final long windowLength = timeWindows.getSize(); - final long windowSlide = timeWindows.getSlide(); - - String opName = "Fast " + timeWindows + " of " + functionName; - - @SuppressWarnings("unchecked") - ReduceFunction<T> reducer = (ReduceFunction<T>) function; - - @SuppressWarnings("unchecked") - OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>) - new AggregatingProcessingTimeWindowOperator<>( - reducer, input.getKeySelector(), - input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), - input.getType().createSerializer(getExecutionEnvironment().getConfig()), - windowLength, windowSlide); - return input.transform(opName, resultType, op); - - } else if (windowAssigner.getClass() == TumblingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) { - TumblingAlignedProcessingTimeWindows timeWindows = (TumblingAlignedProcessingTimeWindows) windowAssigner; - final long windowLength = timeWindows.getSize(); - final long windowSlide = timeWindows.getSize(); - - String opName = "Fast " + timeWindows + " of " + functionName; - - @SuppressWarnings("unchecked") - ReduceFunction<T> reducer = (ReduceFunction<T>) function; - - @SuppressWarnings("unchecked") - OneInputStreamOperator<T, R> op = (OneInputStreamOperator<T, R>) - new AggregatingProcessingTimeWindowOperator<>( - reducer, - input.getKeySelector(), - input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), - input.getType().createSerializer(getExecutionEnvironment().getConfig()), - windowLength, windowSlide); - return input.transform(opName, resultType, op); - } - - return null; - } - - private <R> SingleOutputStreamOperator<R> createFastTimeOperatorIfValid( - InternalWindowFunction<Iterable<T>, R, K, W> function, - TypeInformation<R> resultType, - String functionName) { - - if (windowAssigner.getClass() == SlidingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) { - SlidingAlignedProcessingTimeWindows timeWindows = (SlidingAlignedProcessingTimeWindows) windowAssigner; - final long windowLength = timeWindows.getSize(); - final long windowSlide = timeWindows.getSlide(); - - String opName = "Fast " + timeWindows + " of " + functionName; - - @SuppressWarnings("unchecked") - InternalWindowFunction<Iterable<T>, R, K, TimeWindow> timeWindowFunction = - (InternalWindowFunction<Iterable<T>, R, K, TimeWindow>) function; - - OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>( - timeWindowFunction, input.getKeySelector(), - input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), - input.getType().createSerializer(getExecutionEnvironment().getConfig()), - windowLength, windowSlide); - return input.transform(opName, resultType, op); - } else if (windowAssigner.getClass() == TumblingAlignedProcessingTimeWindows.class && trigger == null && evictor == null) { - TumblingAlignedProcessingTimeWindows timeWindows = (TumblingAlignedProcessingTimeWindows) windowAssigner; - final long windowLength = timeWindows.getSize(); - final long windowSlide = timeWindows.getSize(); - - String opName = "Fast " + timeWindows + " of " + functionName; - - @SuppressWarnings("unchecked") - InternalWindowFunction<Iterable<T>, R, K, TimeWindow> timeWindowFunction = - (InternalWindowFunction<Iterable<T>, R, K, TimeWindow>) function; - - OneInputStreamOperator<T, R> op = new AccumulatingProcessingTimeWindowOperator<>( - timeWindowFunction, input.getKeySelector(), - input.getKeyType().createSerializer(getExecutionEnvironment().getConfig()), - input.getType().createSerializer(getExecutionEnvironment().getConfig()), - windowLength, windowSlide); - return input.transform(opName, resultType, op); - } - - return null; - } - public StreamExecutionEnvironment getExecutionEnvironment() { return input.getExecutionEnvironment(); } http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java index 3c4cfbd..fedd791 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileMonitoringFunction.java @@ -32,7 +32,6 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; -import org.apache.flink.streaming.api.checkpoint.CheckpointedRestoring; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; @@ -66,7 +65,7 @@ import java.util.TreeMap; */ @Internal public class ContinuousFileMonitoringFunction<OUT> - extends RichSourceFunction<TimestampedFileInputSplit> implements CheckpointedFunction, CheckpointedRestoring<Long> { + extends RichSourceFunction<TimestampedFileInputSplit> implements CheckpointedFunction { private static final long serialVersionUID = 1L; @@ -375,12 +374,4 @@ public class ContinuousFileMonitoringFunction<OUT> LOG.debug("{} checkpointed {}.", getClass().getSimpleName(), globalModificationTime); } } - - @Override - public void restoreState(Long state) throws Exception { - this.globalModificationTime = state; - - LOG.info("{} (taskIdx={}) restored global modification time from an older Flink version: {}", - getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask(), globalModificationTime); - } } http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index 3a9e8e1..e14cfda 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -25,30 +25,23 @@ import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.configuration.Configuration; -import org.apache.flink.core.fs.FSDataInputStream; -import org.apache.flink.core.fs.FileInputSplit; -import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.metrics.Counter; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; -import org.apache.flink.streaming.api.operators.CheckpointedRestoringOperator; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.OutputTypeConfigurable; import org.apache.flink.streaming.api.operators.StreamSourceContexts; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.io.ObjectInputStream; import java.io.Serializable; import java.util.ArrayList; -import java.util.LinkedList; import java.util.List; import java.util.PriorityQueue; import java.util.Queue; @@ -60,15 +53,15 @@ import static org.apache.flink.util.Preconditions.checkState; * The operator that reads the {@link TimestampedFileInputSplit splits} received from the preceding * {@link ContinuousFileMonitoringFunction}. Contrary to the {@link ContinuousFileMonitoringFunction} * which has a parallelism of 1, this operator can have DOP > 1. - * <p/> - * As soon as a split descriptor is received, it is put in a queue, and have another + * + * <p>As soon as a split descriptor is received, it is put in a queue, and have another * thread read the actual data of the split. This architecture allows the separation of the * reading thread from the one emitting the checkpoint barriers, thus removing any potential * back-pressure. */ @Internal public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OUT> - implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT>, CheckpointedRestoringOperator { + implements OneInputStreamOperator<TimestampedFileInputSplit, OUT>, OutputTypeConfigurable<OUT> { private static final long serialVersionUID = 1L; @@ -422,83 +415,4 @@ public class ContinuousFileReaderOperator<OUT> extends AbstractStreamOperator<OU getClass().getSimpleName(), subtaskIdx, readerState.size(), readerState); } } - - // ------------------------------------------------------------------------ - // Restoring / Migrating from an older Flink version. - // ------------------------------------------------------------------------ - - @Override - public void restoreState(FSDataInputStream in) throws Exception { - - LOG.info("{} (taskIdx={}) restoring state from an older Flink version.", - getClass().getSimpleName(), getRuntimeContext().getIndexOfThisSubtask()); - - // this is just to read the byte indicating if we have udf state or not - int hasUdfState = in.read(); - - Preconditions.checkArgument(hasUdfState == 0); - - final ObjectInputStream ois = new ObjectInputStream(in); - final DataInputViewStreamWrapper div = new DataInputViewStreamWrapper(in); - - // read the split that was being read - FileInputSplit currSplit = (FileInputSplit) ois.readObject(); - - // read the pending splits list - List<FileInputSplit> pendingSplits = new LinkedList<>(); - int noOfSplits = div.readInt(); - for (int i = 0; i < noOfSplits; i++) { - FileInputSplit split = (FileInputSplit) ois.readObject(); - pendingSplits.add(split); - } - - // read the state of the format - Serializable formatState = (Serializable) ois.readObject(); - - div.close(); - - if (restoredReaderState == null) { - restoredReaderState = new ArrayList<>(); - } - - // we do not know the modification time of the retrieved splits, so we assign them - // artificial ones, with the only constraint that they respect the relative order of the - // retrieved splits, because modification time is going to be used to sort the splits within - // the "pending splits" priority queue. - - long now = getProcessingTimeService().getCurrentProcessingTime(); - long runningModTime = Math.max(now, noOfSplits + 1); - - TimestampedFileInputSplit currentSplit = createTimestampedFileSplit(currSplit, --runningModTime, formatState); - restoredReaderState.add(currentSplit); - for (FileInputSplit split : pendingSplits) { - TimestampedFileInputSplit timestampedSplit = createTimestampedFileSplit(split, --runningModTime); - restoredReaderState.add(timestampedSplit); - } - - if (LOG.isDebugEnabled()) { - if (LOG.isDebugEnabled()) { - LOG.debug("{} (taskIdx={}) restored {} splits from legacy: {}.", - getClass().getSimpleName(), - getRuntimeContext().getIndexOfThisSubtask(), - restoredReaderState.size(), - restoredReaderState); - } - } - } - - private TimestampedFileInputSplit createTimestampedFileSplit(FileInputSplit split, long modificationTime) { - return createTimestampedFileSplit(split, modificationTime, null); - } - - private TimestampedFileInputSplit createTimestampedFileSplit(FileInputSplit split, long modificationTime, Serializable state) { - TimestampedFileInputSplit timestampedSplit = new TimestampedFileInputSplit( - modificationTime, split.getSplitNumber(), split.getPath(), - split.getStart(), split.getLength(), split.getHostnames()); - - if (state != null) { - timestampedSplit.setSplitState(state); - } - return timestampedSplit; - } } http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index abaa74e..884b899 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -26,7 +26,6 @@ import org.apache.flink.api.common.restartstrategy.RestartStrategies; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.IllegalConfigurationException; -import org.apache.flink.migration.streaming.api.graph.StreamGraphHasherV1; import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; @@ -116,7 +115,7 @@ public class StreamingJobGraphGenerator { private StreamingJobGraphGenerator(StreamGraph streamGraph) { this.streamGraph = streamGraph; this.defaultStreamGraphHasher = new StreamGraphHasherV2(); - this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphHasherV1(), new StreamGraphUserHashHasher()); + this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher()); this.jobVertices = new HashMap<>(); this.builtVertices = new HashSet<>(); @@ -241,14 +240,14 @@ public class StreamingJobGraphGenerator { createChain(nonChainable.getTargetId(), nonChainable.getTargetId(), hashes, legacyHashes, 0, chainedOperatorHashes); } - List<Tuple2<byte[], byte[]>> operatorHashes = chainedOperatorHashes.get(startNodeId); - if (operatorHashes == null) { - operatorHashes = new ArrayList<>(); - chainedOperatorHashes.put(startNodeId, operatorHashes); - } + List<Tuple2<byte[], byte[]>> operatorHashes = + chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>()); byte[] primaryHashBytes = hashes.get(currentNodeId); - operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHashes.get(1).get(currentNodeId))); + + for (Map<Integer, byte[]> legacyHash : legacyHashes) { + operatorHashes.add(new Tuple2<>(primaryHashBytes, legacyHash.get(currentNodeId))); + } chainedNames.put(currentNodeId, createChainedName(currentNodeId, chainableOutputs)); chainedMinResources.put(currentNodeId, createChainedMinResources(currentNodeId, chainableOutputs)); http://git-wip-us.apache.org/repos/asf/flink/blob/6642768a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java index a72b9fe..a28fc30 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperator.java @@ -28,7 +28,6 @@ import org.apache.flink.api.common.typeutils.TypeSerializer; import org.apache.flink.api.java.functions.KeySelector; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; -import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputViewStreamWrapper; import org.apache.flink.metrics.Counter; @@ -55,7 +54,6 @@ import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateInitializationContextImpl; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.runtime.state.StateSnapshotContextSynchronousImpl; -import org.apache.flink.runtime.state.StreamStateHandle; import org.apache.flink.runtime.state.VoidNamespace; import org.apache.flink.runtime.state.VoidNamespaceSerializer; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -251,42 +249,6 @@ public abstract class AbstractStreamOperator<OUT> getContainingTask().getCancelables()); // access to register streams for canceling initializeState(initializationContext); - - if (restoring) { - - // finally restore the legacy state in case we are - // migrating from a previous Flink version. - - restoreStreamCheckpointed(stateHandles); - } - } - - /** - * @deprecated Non-repartitionable operator state that has been deprecated. - * Can be removed when we remove the APIs for non-repartitionable operator state. - */ - @Deprecated - private void restoreStreamCheckpointed(OperatorSubtaskState stateHandles) throws Exception { - StreamStateHandle state = stateHandles.getLegacyOperatorState(); - if (null != state) { - if (this instanceof CheckpointedRestoringOperator) { - - LOG.debug("Restore state of task {} in operator with id ({}).", - getContainingTask().getName(), getOperatorID()); - - FSDataInputStream is = state.openInputStream(); - try { - getContainingTask().getCancelables().registerClosable(is); - ((CheckpointedRestoringOperator) this).restoreState(is); - } finally { - getContainingTask().getCancelables().unregisterClosable(is); - is.close(); - } - } else { - throw new Exception( - "Found legacy operator state for operator that does not implement StreamCheckpointedOperator."); - } - } } /** @@ -451,35 +413,6 @@ public abstract class AbstractStreamOperator<OUT> } /** - * @deprecated Non-repartitionable operator state that has been deprecated. - * Can be removed when we remove the APIs for non-repartitionable operator state. - */ - @SuppressWarnings("deprecation") - @Deprecated - @Override - public StreamStateHandle snapshotLegacyOperatorState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions) throws Exception { - if (this instanceof StreamCheckpointedOperator) { - CheckpointStreamFactory factory = getCheckpointStreamFactory(checkpointOptions); - - final CheckpointStreamFactory.CheckpointStateOutputStream outStream = - factory.createCheckpointStateOutputStream(checkpointId, timestamp); - - getContainingTask().getCancelables().registerClosable(outStream); - - try { - ((StreamCheckpointedOperator) this).snapshotState(outStream, checkpointId, timestamp); - return outStream.closeAndGetHandle(); - } - finally { - getContainingTask().getCancelables().unregisterClosable(outStream); - outStream.close(); - } - } else { - return null; - } - } - - /** * Stream operators with state which can be restored need to override this hook method. * * @param context context that allows to register different states.