Repository: flink Updated Branches: refs/heads/master 8d1efa045 -> 84b39dcb5
[FLINK-2008] [FLINK-2296] Fix checkpoint committing & KafkaITCase This closes #895 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/aa5e5b30 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/aa5e5b30 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/aa5e5b30 Branch: refs/heads/master Commit: aa5e5b3087a68f2aac792c0b0fc64b4f9c707e9b Parents: 8d1efa0 Author: Robert Metzger <[email protected]> Authored: Mon Jun 29 16:52:38 2015 +0200 Committer: Stephan Ewen <[email protected]> Committed: Mon Jul 13 17:54:30 2015 +0200 ---------------------------------------------------------------------- docs/apis/streaming_guide.md | 3 + .../checkpoint/CheckpointCoordinator.java | 11 +- .../checkpoint/SuccessfulCheckpoint.java | 22 +-- .../runtime/executiongraph/ExecutionVertex.java | 4 + .../tasks/CheckpointCommittingOperator.java | 27 ---- .../tasks/CheckpointNotificationOperator.java | 25 ++++ .../messages/checkpoint/ConfirmCheckpoint.java | 88 ------------ .../checkpoint/NotifyCheckpointComplete.java | 73 ++++++++++ .../apache/flink/runtime/taskmanager/Task.java | 20 +-- .../flink/runtime/taskmanager/TaskManager.scala | 7 +- .../checkpoint/CheckpointCoordinatorTest.java | 18 +-- .../messages/CheckpointMessagesTest.java | 4 +- .../runtime/taskmanager/TaskAsyncCallTest.java | 12 +- .../flink/streaming/connectors/kafka/Utils.java | 30 +++- .../api/persistent/PersistentKafkaSource.java | 108 ++++++++++---- .../streaming/connectors/kafka/KafkaITCase.java | 144 +++++++++++++++---- .../connectors/kafka/util/UtilsTest.java | 75 ++++++++++ .../api/checkpoint/CheckpointCommitter.java | 44 ------ .../api/checkpoint/CheckpointNotifier.java | 37 +++++ .../api/graph/StreamingJobGraphGenerator.java | 5 +- .../operators/AbstractUdfStreamOperator.java | 9 +- .../api/operators/StatefulStreamOperator.java | 2 +- .../streaming/runtime/tasks/StreamTask.java | 80 ++++------- .../src/test/resources/log4j-test.properties | 2 +- .../StreamCheckpointingITCase.java | 14 +- .../org/apache/flink/yarn/YarnTestBase.java | 6 +- 26 files changed, 525 insertions(+), 345 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/docs/apis/streaming_guide.md ---------------------------------------------------------------------- diff --git a/docs/apis/streaming_guide.md b/docs/apis/streaming_guide.md index e337ea8..02da8cb 100644 --- a/docs/apis/streaming_guide.md +++ b/docs/apis/streaming_guide.md @@ -1319,6 +1319,9 @@ Another way of exposing user defined operator state for the Flink runtime for ch When the user defined function implements the `Checkpointed` interface, the `snapshotState(â¦)` and `restoreState(â¦)` methods will be executed to draw and restore function state. +In addition to that, user functions can also implement the `CheckpointNotifier` interface to receive notifications on completed checkpoints via the `notifyCheckpointComplete(long checkpointId)` method. +Note that there is no guarantee for the user function to receive a notification if a failure happens between checkpoint completion and notification. The notifications should hence be treated in a way that notifications from later checkpoints can subsume missing notifications. + For example the same counting, reduce function shown for `OperatorState`s by using the `Checkpointed` interface instead: {% highlight java %} http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index 91fd424..2b2bf6b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -30,10 +30,8 @@ import org.apache.flink.runtime.executiongraph.ExecutionJobVertex; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; -import org.apache.flink.runtime.messages.checkpoint.ConfirmCheckpoint; +import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete; import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.util.SerializedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -375,11 +373,8 @@ public class CheckpointCoordinator { Execution ee = ev.getCurrentExecutionAttempt(); if (ee != null) { ExecutionAttemptID attemptId = ee.getAttemptId(); - StateForTask stateForTask = completed.getState(ev.getJobvertexId()); - SerializedValue<StateHandle<?>> taskState = (stateForTask != null) ? stateForTask.getState() : null; - ConfirmCheckpoint confirmMessage = new ConfirmCheckpoint(job, attemptId, checkpointId, - timestamp, taskState); - ev.sendMessageToCurrentExecution(confirmMessage, ee.getAttemptId()); + NotifyCheckpointComplete notifyMessage = new NotifyCheckpointComplete(job, attemptId, checkpointId, timestamp); + ev.sendMessageToCurrentExecution(notifyMessage, ee.getAttemptId()); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java index 5432d33..be0b301 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/SuccessfulCheckpoint.java @@ -18,14 +18,11 @@ package org.apache.flink.runtime.checkpoint; -import com.google.common.collect.Maps; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.jobgraph.JobVertexID; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; -import java.util.Map; /** * A successful checkpoint describes a checkpoint after all required tasks acknowledged it (with their state) @@ -41,9 +38,7 @@ public class SuccessfulCheckpoint { private final long timestamp; - private final Map<JobVertexID, StateForTask> vertexToState; - - private final List<StateForTask> states; + private final List<StateForTask> states; public SuccessfulCheckpoint(JobID job, long checkpointID, long timestamp, List<StateForTask> states) { @@ -51,10 +46,6 @@ public class SuccessfulCheckpoint { this.checkpointID = checkpointID; this.timestamp = timestamp; this.states = states; - vertexToState = Maps.newHashMap(); - for(StateForTask state : states){ - vertexToState.put(state.getOperatorId(), state); - } } public JobID getJobId() { @@ -73,17 +64,6 @@ public class SuccessfulCheckpoint { return states; } - /** - * Returns the task state included in the checkpoint for a given JobVertexID if it exists or - * null if no state is included for that id. - * - * @param jobVertexID - * @return - */ - public StateForTask getState(JobVertexID jobVertexID) { - return vertexToState.get(jobVertexID); - } - // -------------------------------------------------------------------------------------------- public void discard(ClassLoader userClassLoader) { http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index d44cb6a..a70fa7d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -168,6 +168,10 @@ public class ExecutionVertex implements Serializable { getTotalNumberOfParallelSubtasks()); } + public int getSubTaskIndex() { + return subTaskIndex; + } + public int getTotalNumberOfParallelSubtasks() { return this.jobVertex.getParallelism(); } http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java deleted file mode 100644 index a6f9851..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointCommittingOperator.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.jobgraph.tasks; - -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.util.SerializedValue; - -public interface CheckpointCommittingOperator { - - void confirmCheckpoint(long checkpointId, SerializedValue<StateHandle<?>> state) throws Exception; -} http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java new file mode 100644 index 0000000..90c82b7 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/tasks/CheckpointNotificationOperator.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.jobgraph.tasks; + + +public interface CheckpointNotificationOperator { + + void notifyCheckpointComplete(long checkpointId) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java deleted file mode 100644 index 328f692..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/ConfirmCheckpoint.java +++ /dev/null @@ -1,88 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.messages.checkpoint; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.util.SerializedValue; - -/** - * This message is sent from the {@link org.apache.flink.runtime.jobmanager.JobManager} to the - * {@link org.apache.flink.runtime.taskmanager.TaskManager} to tell a task that the checkpoint - * has been confirmed and that the task can commit the checkpoint to the outside world. - */ -public class ConfirmCheckpoint extends AbstractCheckpointMessage implements java.io.Serializable { - - private static final long serialVersionUID = 2094094662279578953L; - - /** The timestamp associated with the checkpoint */ - private final long timestamp; - - /** The stateHandle associated with the checkpoint confirmation message*/ - private final SerializedValue<StateHandle<?>> state; - - public ConfirmCheckpoint(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp, - SerializedValue<StateHandle<?>> state) { - super(job, taskExecutionId, checkpointId); - this.timestamp = timestamp; - this.state = state; - } - - // -------------------------------------------------------------------------------------------- - - public long getTimestamp() { - return timestamp; - } - - // -------------------------------------------------------------------------------------------- - - /** - * Returns the stateHandle that was included in the confirmed checkpoint for a given task or null - * if no state was commited in that checkpoint. - */ - public SerializedValue<StateHandle<?>> getState() { - return state; - } - - @Override - public int hashCode() { - return super.hashCode() + (int) (timestamp ^ (timestamp >>> 32)); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - else if (o instanceof ConfirmCheckpoint) { - ConfirmCheckpoint that = (ConfirmCheckpoint) o; - return this.timestamp == that.timestamp && super.equals(o); - } - else { - return false; - } - } - - @Override - public String toString() { - return String.format("ConfirmCheckpoint %d for (%s/%s)", - getCheckpointId(), getJob(), getTaskExecutionId()); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/NotifyCheckpointComplete.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/NotifyCheckpointComplete.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/NotifyCheckpointComplete.java new file mode 100644 index 0000000..c64c77a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/NotifyCheckpointComplete.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.messages.checkpoint; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; + +/** + * This message is sent from the {@link org.apache.flink.runtime.jobmanager.JobManager} to the + * {@link org.apache.flink.runtime.taskmanager.TaskManager} to tell a task that the checkpoint + * has been confirmed and that the task can commit the checkpoint to the outside world. + */ +public class NotifyCheckpointComplete extends AbstractCheckpointMessage implements java.io.Serializable { + + private static final long serialVersionUID = 2094094662279578953L; + + /** The timestamp associated with the checkpoint */ + private final long timestamp; + + public NotifyCheckpointComplete(JobID job, ExecutionAttemptID taskExecutionId, long checkpointId, long timestamp) { + super(job, taskExecutionId, checkpointId); + this.timestamp = timestamp; + } + + // -------------------------------------------------------------------------------------------- + + public long getTimestamp() { + return timestamp; + } + + // -------------------------------------------------------------------------------------------- + + @Override + public int hashCode() { + return super.hashCode() + (int) (timestamp ^ (timestamp >>> 32)); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + else if (o instanceof NotifyCheckpointComplete) { + NotifyCheckpointComplete that = (NotifyCheckpointComplete) o; + return this.timestamp == that.timestamp && super.equals(o); + } + else { + return false; + } + } + + @Override + public String toString() { + return String.format("ConfirmCheckpoint %d for (%s/%s)", + getCheckpointId(), getJob(), getTaskExecutionId()); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 1b2fb08..616998c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -46,7 +46,7 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator; import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator; import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier; import org.apache.flink.runtime.memorymanager.MemoryManager; @@ -883,11 +883,11 @@ public class Task implements Runnable { checkpointer.triggerCheckpoint(checkpointID, checkpointTimestamp); } catch (Throwable t) { - logger.error("Error while triggering checkpoint for " + taskName, t); + failExternally(new RuntimeException("Error while triggering checkpoint for " + taskName, t)); } } }; - executeAsyncCallRunnable(runnable, "Checkpoint Trigger"); + executeAsyncCallRunnable(runnable, "Checkpoint Trigger for " + taskName); } else { LOG.error("Task received a checkpoint request, but is not a checkpointing task - " @@ -899,15 +899,14 @@ public class Task implements Runnable { } } - public void confirmCheckpoint(final long checkpointID, - final SerializedValue<StateHandle<?>> state) { + public void notifyCheckpointComplete(final long checkpointID) { AbstractInvokable invokable = this.invokable; if (executionState == ExecutionState.RUNNING && invokable != null) { - if (invokable instanceof CheckpointCommittingOperator) { + if (invokable instanceof CheckpointNotificationOperator) { // build a local closure - final CheckpointCommittingOperator checkpointer = (CheckpointCommittingOperator) invokable; + final CheckpointNotificationOperator checkpointer = (CheckpointNotificationOperator) invokable; final Logger logger = LOG; final String taskName = taskNameWithSubtask; @@ -915,14 +914,15 @@ public class Task implements Runnable { @Override public void run() { try { - checkpointer.confirmCheckpoint(checkpointID, state); + checkpointer.notifyCheckpointComplete(checkpointID); } catch (Throwable t) { - logger.error("Error while confirming checkpoint for " + taskName, t); + // fail task if checkpoint confirmation failed. + failExternally(new RuntimeException("Error while confirming checkpoint", t)); } } }; - executeAsyncCallRunnable(runnable, "Checkpoint Confirmation"); + executeAsyncCallRunnable(runnable, "Checkpoint Confirmation for " + taskName); } else { LOG.error("Task received a checkpoint commit notification, but is not a checkpoint committing task - " http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala index 8c816a1..520decd 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala @@ -36,7 +36,7 @@ import com.fasterxml.jackson.databind.ObjectMapper import grizzled.slf4j.Logger import org.apache.flink.configuration.{Configuration, ConfigConstants, GlobalConfiguration, IllegalConfigurationException} -import org.apache.flink.runtime.messages.checkpoint.{ConfirmCheckpoint, TriggerCheckpoint, AbstractCheckpointMessage} +import org.apache.flink.runtime.messages.checkpoint.{NotifyCheckpointComplete, TriggerCheckpoint, AbstractCheckpointMessage} import org.apache.flink.runtime.{StreamingMode, ActorSynchronousLogging, ActorLogMessages} import org.apache.flink.runtime.akka.AkkaUtils import org.apache.flink.runtime.blob.{BlobService, BlobCache} @@ -425,17 +425,16 @@ extends Actor with ActorLogMessages with ActorSynchronousLogging { log.debug(s"Taskmanager received a checkpoint request for unknown task $taskExecutionId.") } - case message: ConfirmCheckpoint => + case message: NotifyCheckpointComplete => val taskExecutionId = message.getTaskExecutionId val checkpointId = message.getCheckpointId val timestamp = message.getTimestamp - val state = message.getState log.debug(s"Receiver ConfirmCheckpoint ${checkpointId}@${timestamp} for $taskExecutionId.") val task = runningTasks.get(taskExecutionId) if (task != null) { - task.confirmCheckpoint(checkpointId, state) + task.notifyCheckpointComplete(checkpointId) } else { log.debug( s"Taskmanager received a checkpoint confirmation for unknown task $taskExecutionId.") http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java index d56704d..c02d301 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTest.java @@ -26,7 +26,7 @@ import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; -import org.apache.flink.runtime.messages.checkpoint.ConfirmCheckpoint; +import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete; import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint; import org.junit.Test; @@ -199,8 +199,8 @@ public class CheckpointCoordinatorTest { // validate that the relevant tasks got a confirmation message { - ConfirmCheckpoint confirmMessage1 = new ConfirmCheckpoint(jid, attemptID1, checkpointId, timestamp, null); - ConfirmCheckpoint confirmMessage2 = new ConfirmCheckpoint(jid, attemptID2, checkpointId, timestamp, null); + NotifyCheckpointComplete confirmMessage1 = new NotifyCheckpointComplete(jid, attemptID1, checkpointId, timestamp); + NotifyCheckpointComplete confirmMessage2 = new NotifyCheckpointComplete(jid, attemptID2, checkpointId, timestamp); verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1)); verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2)); } @@ -237,8 +237,8 @@ public class CheckpointCoordinatorTest { verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(expectedMessage1), eq(attemptID1)); verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(expectedMessage2), eq(attemptID2)); - ConfirmCheckpoint confirmMessage1 = new ConfirmCheckpoint(jid, attemptID1, checkpointIdNew, timestampNew, null); - ConfirmCheckpoint confirmMessage2 = new ConfirmCheckpoint(jid, attemptID2, checkpointIdNew, timestampNew, null); + NotifyCheckpointComplete confirmMessage1 = new NotifyCheckpointComplete(jid, attemptID1, checkpointIdNew, timestampNew); + NotifyCheckpointComplete confirmMessage2 = new NotifyCheckpointComplete(jid, attemptID2, checkpointIdNew, timestampNew); verify(vertex1, times(1)).sendMessageToCurrentExecution(eq(confirmMessage1), eq(attemptID1)); verify(vertex2, times(1)).sendMessageToCurrentExecution(eq(confirmMessage2), eq(attemptID2)); } @@ -343,7 +343,7 @@ public class CheckpointCoordinatorTest { // the first confirm message should be out verify(commitVertex, times(1)).sendMessageToCurrentExecution( - new ConfirmCheckpoint(jid, commitAttemptID, checkpointId1, timestamp1, null), commitAttemptID); + new NotifyCheckpointComplete(jid, commitAttemptID, checkpointId1, timestamp1), commitAttemptID); // send the last remaining ack for the second checkpoint coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId2)); @@ -355,7 +355,7 @@ public class CheckpointCoordinatorTest { // the second commit message should be out verify(commitVertex, times(1)).sendMessageToCurrentExecution( - new ConfirmCheckpoint(jid, commitAttemptID, checkpointId2, timestamp2, null), commitAttemptID); + new NotifyCheckpointComplete(jid, commitAttemptID, checkpointId2, timestamp2), commitAttemptID); // validate the committed checkpoints List<SuccessfulCheckpoint> scs = coord.getSuccessfulCheckpoints(); @@ -482,7 +482,7 @@ public class CheckpointCoordinatorTest { // the first confirm message should be out verify(commitVertex, times(1)).sendMessageToCurrentExecution( - new ConfirmCheckpoint(jid, commitAttemptID, checkpointId2, timestamp2, null), commitAttemptID); + new NotifyCheckpointComplete(jid, commitAttemptID, checkpointId2, timestamp2), commitAttemptID); // send the last remaining ack for the first checkpoint. This should not do anything coord.receiveAcknowledgeMessage(new AcknowledgeCheckpoint(jid, ackAttemptID3, checkpointId1)); @@ -551,7 +551,7 @@ public class CheckpointCoordinatorTest { // no confirm message must have been sent verify(commitVertex, times(0)) - .sendMessageToCurrentExecution(any(ConfirmCheckpoint.class), any(ExecutionAttemptID.class)); + .sendMessageToCurrentExecution(any(NotifyCheckpointComplete.class), any(ExecutionAttemptID.class)); coord.shutdown(); } http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java index 05255ac..211d5e3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java @@ -22,7 +22,7 @@ import static org.junit.Assert.*; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.messages.checkpoint.ConfirmCheckpoint; +import org.apache.flink.runtime.messages.checkpoint.NotifyCheckpointComplete; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint; import org.apache.flink.runtime.state.StateHandle; @@ -38,7 +38,7 @@ public class CheckpointMessagesTest { @Test public void testTriggerAndConfirmCheckpoint() { try { - ConfirmCheckpoint cc = new ConfirmCheckpoint(new JobID(), new ExecutionAttemptID(), 45287698767345L, 467L, null); + NotifyCheckpointComplete cc = new NotifyCheckpointComplete(new JobID(), new ExecutionAttemptID(), 45287698767345L, 467L); testSerializabilityEqualsHashCode(cc); TriggerCheckpoint tc = new TriggerCheckpoint(new JobID(), new ExecutionAttemptID(), 347652734L, 7576752L); http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java index 618c01f..b18bf2a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java @@ -40,12 +40,10 @@ import org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNo import org.apache.flink.runtime.io.network.partition.ResultPartitionManager; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator; import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator; import org.apache.flink.runtime.memorymanager.MemoryManager; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.util.SerializedValue; import org.junit.AfterClass; import org.junit.Before; import org.junit.BeforeClass; @@ -134,7 +132,7 @@ public class TaskAsyncCallTest { for (int i = 1; i <= NUM_CALLS; i++) { task.triggerCheckpointBarrier(i, 156865867234L); - task.confirmCheckpoint(i, null); + task.notifyCheckpointComplete(i); } triggerLatch.await(); @@ -186,7 +184,7 @@ public class TaskAsyncCallTest { } public static class CheckpointsInOrderInvokable extends AbstractInvokable - implements CheckpointedOperator, CheckpointCommittingOperator { + implements CheckpointedOperator, CheckpointNotificationOperator { private volatile long lastCheckpointId = 0; @@ -213,7 +211,7 @@ public class TaskAsyncCallTest { } @Override - public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception { + public void triggerCheckpoint(long checkpointId, long timestamp) { lastCheckpointId++; if (checkpointId == lastCheckpointId) { if (lastCheckpointId == NUM_CALLS) { @@ -229,7 +227,7 @@ public class TaskAsyncCallTest { } @Override - public void confirmCheckpoint(long checkpointId, SerializedValue<StateHandle<?>> state) throws Exception { + public void notifyCheckpointComplete(long checkpointId) { if (checkpointId != lastCheckpointId && this.error == null) { this.error = new Exception("calls out of order"); synchronized (this) { http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java index 4286196..abe06b8 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/Utils.java @@ -27,16 +27,25 @@ import org.apache.flink.streaming.util.serialization.SerializationSchema; import java.io.IOException; +/** + * Utilities for the Kafka connector + */ public class Utils { - public static class TypeInformationSerializationSchema<T> - implements DeserializationSchema<T>, SerializationSchema<T, byte[]> { + + /** + * Utility serialization schema, created from Flink's TypeInformation system. + * @param <T> + */ + public static class TypeInformationSerializationSchema<T> implements DeserializationSchema<T>, SerializationSchema<T, byte[]> { private final TypeSerializer<T> serializer; private final TypeInformation<T> ti; + private transient DataOutputSerializer dos; - public TypeInformationSerializationSchema(Object type, ExecutionConfig ec) { - this.ti = (TypeInformation<T>) TypeExtractor.getForObject(type); + public TypeInformationSerializationSchema(T type, ExecutionConfig ec) { + this.ti = TypeExtractor.getForObject(type); this.serializer = ti.createSerializer(ec); } + @Override public T deserialize(byte[] message) { try { @@ -53,13 +62,22 @@ public class Utils { @Override public byte[] serialize(T element) { - DataOutputSerializer dos = new DataOutputSerializer(16); + if(dos == null) { + dos = new DataOutputSerializer(16); + } try { serializer.serialize(element, dos); } catch (IOException e) { throw new RuntimeException("Unable to serialize record", e); } - return dos.getByteArray(); + byte[] ret = dos.getByteArray(); + if(ret.length != dos.length()) { + byte[] n = new byte[dos.length()]; + System.arraycopy(ret, 0, n, 0, dos.length()); + ret = n; + } + dos.clear(); + return ret; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java index 6758f2c..b4a0b8b 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/api/persistent/PersistentKafkaSource.java @@ -20,7 +20,6 @@ package org.apache.flink.streaming.connectors.kafka.api.persistent; import java.io.IOException; import java.io.ObjectInputStream; import java.io.ObjectOutputStream; -import java.io.Serializable; import java.io.UnsupportedEncodingException; import java.util.Arrays; import java.util.Collections; @@ -41,12 +40,12 @@ import kafka.utils.ZkUtils; import org.I0Itec.zkclient.ZkClient; import org.I0Itec.zkclient.exception.ZkMarshallingError; import org.I0Itec.zkclient.serialize.ZkSerializer; -import org.apache.flink.api.common.state.OperatorState; +import org.apache.commons.collections.map.LinkedMap; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.typeutils.ResultTypeQueryable; import org.apache.flink.configuration.Configuration; -import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter; +import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; +import org.apache.flink.streaming.api.checkpoint.CheckpointedAsynchronously; import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction; import org.apache.flink.streaming.util.serialization.DeserializationSchema; import org.apache.zookeeper.data.Stat; @@ -67,13 +66,14 @@ import com.google.common.base.Preconditions; */ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> implements ResultTypeQueryable<OUT>, - CheckpointCommitter { + CheckpointNotifier, CheckpointedAsynchronously<long[]> { private static final long serialVersionUID = 287845877188312621L; private static final Logger LOG = LoggerFactory.getLogger(PersistentKafkaSource.class); - + private final LinkedMap pendingCheckpoints = new LinkedMap(); + private final String topicName; private final DeserializationSchema<OUT> deserializationSchema; @@ -82,9 +82,11 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> private transient ConsumerConnector consumer; private transient ZkClient zkClient; - private transient OperatorState<long[]> lastOffsets; - private transient long[] commitedOffsets; // maintain committed offsets, to avoid committing the same over and over again. - + + private transient long[] lastOffsets; // Current offset (backuped state) + protected transient long[] commitedOffsets; // maintain committed offsets, to avoid committing the same over and over again. + private transient long[] restoreState; // set by the restore() method, used by open() to valdiate the restored state. + private volatile boolean running; /** @@ -145,23 +147,26 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> // most likely the number of offsets we're going to store here will be lower than the number of partitions. int numPartitions = getNumberOfPartitions(); LOG.debug("The topic {} has {} partitions", topicName, numPartitions); - this.lastOffsets = getRuntimeContext().getOperatorState("offset", new long[numPartitions], false); + this.lastOffsets = new long[numPartitions]; this.commitedOffsets = new long[numPartitions]; + // check if there are offsets to restore - if (!Arrays.equals(lastOffsets.value(), new long[numPartitions])) { - if (lastOffsets.value().length != numPartitions) { - throw new IllegalStateException("There are "+lastOffsets.value().length+" offsets to restore for topic "+topicName+" but " + + if (restoreState != null) { + if (restoreState.length != numPartitions) { + throw new IllegalStateException("There are "+restoreState.length+" offsets to restore for topic "+topicName+" but " + "there are only "+numPartitions+" in the topic"); } - LOG.info("Setting restored offsets {} in ZooKeeper", Arrays.toString(lastOffsets.value())); - setOffsetsInZooKeeper(lastOffsets.value()); + LOG.info("Setting restored offsets {} in ZooKeeper", Arrays.toString(restoreState)); + setOffsetsInZooKeeper(restoreState); + this.lastOffsets = restoreState; } else { // initialize empty offsets - Arrays.fill(this.lastOffsets.value(), -1); + Arrays.fill(this.lastOffsets, -1); } Arrays.fill(this.commitedOffsets, 0); // just to make it clear - + + pendingCheckpoints.clear(); running = true; } @@ -175,7 +180,7 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> while (running && iteratorToRead.hasNext()) { MessageAndMetadata<byte[], byte[]> message = iteratorToRead.next(); - if(lastOffsets.value()[message.partition()] >= message.offset()) { + if(lastOffsets[message.partition()] >= message.offset()) { LOG.info("Skipping message with offset {} from partition {}", message.offset(), message.partition()); continue; } @@ -188,7 +193,7 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> // make the state update and the element emission atomic synchronized (checkpointLock) { - lastOffsets.value()[message.partition()] = message.offset(); + lastOffsets[message.partition()] = message.offset(); ctx.collect(next); } @@ -210,19 +215,63 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> zkClient.close(); } + // ----------------- State Checkpointing ----------------- + + @Override + public long[] snapshotState(long checkpointId, long checkpointTimestamp) throws Exception { + if (lastOffsets == null) { + LOG.warn("State snapshot requested on not yet opened source. Returning null"); + return null; + } + + if (LOG.isInfoEnabled()) { + LOG.info("Snapshotting state. Offsets: {}, checkpoint id {}, timestamp {}", + Arrays.toString(lastOffsets), checkpointId, checkpointTimestamp); + } + + long[] currentOffsets = Arrays.copyOf(lastOffsets, lastOffsets.length); + + // the map may be asynchronously updates when committing to Kafka, so we synchronize + synchronized (pendingCheckpoints) { + pendingCheckpoints.put(checkpointId, currentOffsets); + } + + return currentOffsets; + } + + @Override + public void restoreState(long[] state) { + LOG.info("The state will be restored to {} in the open() method", Arrays.toString(state)); + this.restoreState = Arrays.copyOf(state, state.length); + } + /** * Notification on completed checkpoints * @param checkpointId The ID of the checkpoint that has been completed. * @throws Exception */ - @Override - public void commitCheckpoint(long checkpointId, String stateName, StateHandle<Serializable> state) throws Exception { + public void notifyCheckpointComplete(long checkpointId) throws Exception { LOG.info("Commit checkpoint {}", checkpointId); long[] checkpointOffsets; - checkpointOffsets = (long[]) state.getState(); + // the map may be asynchronously updates when snapshotting state, so we synchronize + synchronized (pendingCheckpoints) { + final int posInMap = pendingCheckpoints.indexOf(checkpointId); + if (posInMap == -1) { + LOG.warn("Unable to find pending checkpoint for id {}", checkpointId); + return; + } + + checkpointOffsets = (long[]) pendingCheckpoints.remove(posInMap); + // remove older checkpoints in map: + if (!pendingCheckpoints.isEmpty()) { + for(int i = 0; i < posInMap; i++) { + pendingCheckpoints.remove(0); + } + } + } if (LOG.isInfoEnabled()) { LOG.info("Committing offsets {} to ZooKeeper", Arrays.toString(checkpointOffsets)); @@ -254,11 +303,15 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> } protected void setOffset(int partition, long offset) { - if(commitedOffsets[partition] < offset) { - setOffset(zkClient, consumerConfig.groupId(), topicName, partition, offset); - commitedOffsets[partition] = offset; - } else { - LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition); + // synchronize because notifyCheckpointComplete is called using asynchronous worker threads (= multiple checkpoints might be confirmed concurrently) + synchronized (commitedOffsets) { + if(commitedOffsets[partition] < offset) { + LOG.info("Committed offsets {}, partition={}, offset={}, locking on {}", Arrays.toString(commitedOffsets), partition, offset, commitedOffsets.hashCode()); + setOffset(zkClient, consumerConfig.groupId(), topicName, partition, offset); + commitedOffsets[partition] = offset; + } else { + LOG.debug("Ignoring offset {} for partition {} because it is already committed", offset, partition); + } } } @@ -305,7 +358,6 @@ public class PersistentKafkaSource<OUT> extends RichParallelSourceFunction<OUT> return deserializationSchema.getProducedType(); } - // ---------------------- Zookeeper Serializer copied from Kafka (because it has private access there) ----------------- public static class KafkaZKStringSerializer implements ZkSerializer { http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java index 4b763b2..e039592 100644 --- a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/KafkaITCase.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import java.io.File; import java.io.IOException; +import java.lang.reflect.Field; import java.net.InetAddress; import java.net.UnknownHostException; import java.util.ArrayList; @@ -46,6 +47,7 @@ import kafka.server.KafkaConfig; import kafka.server.KafkaServer; import org.I0Itec.zkclient.ZkClient; +import org.apache.commons.collections.map.LinkedMap; import org.apache.curator.test.TestingServer; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.functions.MapFunction; @@ -182,6 +184,63 @@ public class KafkaITCase { zkClient.close(); } + // -------------------------- test checkpointing ------------------------ + @Test + public void testCheckpointing() throws Exception { + createTestTopic("testCheckpointing", 1, 1); + + Properties props = new Properties(); + props.setProperty("zookeeper.connect", zookeeperConnectionString); + props.setProperty("group.id", "testCheckpointing"); + props.setProperty("auto.commit.enable", "false"); + ConsumerConfig cc = new ConsumerConfig(props); + PersistentKafkaSource<String> source = new PersistentKafkaSource<String>("testCheckpointing", new FakeDeserializationSchema(), cc); + + + Field pendingCheckpointsField = PersistentKafkaSource.class.getDeclaredField("pendingCheckpoints"); + pendingCheckpointsField.setAccessible(true); + LinkedMap pendingCheckpoints = (LinkedMap) pendingCheckpointsField.get(source); + + + Assert.assertEquals(0, pendingCheckpoints.size()); + // first restore + source.restoreState(new long[]{1337}); + // then open + source.open(new Configuration()); + long[] state1 = source.snapshotState(1, 15); + Assert.assertArrayEquals(new long[]{1337}, state1); + long[] state2 = source.snapshotState(2, 30); + Assert.assertArrayEquals(new long[]{1337}, state2); + Assert.assertEquals(2, pendingCheckpoints.size()); + + source.notifyCheckpointComplete(1); + Assert.assertEquals(1, pendingCheckpoints.size()); + + source.notifyCheckpointComplete(2); + Assert.assertEquals(0, pendingCheckpoints.size()); + + source.notifyCheckpointComplete(666); // invalid checkpoint + Assert.assertEquals(0, pendingCheckpoints.size()); + + // create 500 snapshots + for(int i = 0; i < 500; i++) { + source.snapshotState(i, 15 * i); + } + Assert.assertEquals(500, pendingCheckpoints.size()); + + // commit only the second last + source.notifyCheckpointComplete(498); + Assert.assertEquals(1, pendingCheckpoints.size()); + + // access invalid checkpoint + source.notifyCheckpointComplete(490); + + // and the last + source.notifyCheckpointComplete(499); + Assert.assertEquals(0, pendingCheckpoints.size()); + } + + private static class FakeDeserializationSchema implements DeserializationSchema<String> { @Override @@ -220,6 +279,34 @@ public class KafkaITCase { zk.close(); } + + public static class TestPersistentKafkaSource<OUT> extends PersistentKafkaSource<OUT> { + private static Object sync = new Object(); + public static long[] finalOffset; + public TestPersistentKafkaSource(String topicName, DeserializationSchema<OUT> deserializationSchema, ConsumerConfig consumerConfig) { + super(topicName, deserializationSchema, consumerConfig); + } + + @Override + public void close() { + super.close(); + LOG.info("Starting close " +Arrays.toString(commitedOffsets)); + synchronized (sync) { + if (finalOffset == null) { + finalOffset = new long[commitedOffsets.length]; + } + for(int i = 0; i < commitedOffsets.length; i++) { + if(commitedOffsets[i] > 0) { + if(finalOffset[i] > 0) { + throw new RuntimeException("This is unexpected on i = "+i); + } + finalOffset[i] = commitedOffsets[i]; + } + } + } + LOG.info("Finished closing. Final "+Arrays.toString(finalOffset)); + } + } /** * We want to use the High level java consumer API but manage the offset in Zookeeper manually. * @@ -246,27 +333,35 @@ public class KafkaITCase { // write a sequence from 0 to 99 to each of the three partitions. writeSequence(env, topicName, 0, 99); - readSequence(env, standardCC, topicName, 0, 100, 300); + LOG.info("State in persistent kafka sources {}", TestPersistentKafkaSource.finalOffset); + // check offsets to be set at least higher than 50. // correctly, we would expect them to be set to 99, but right now there is no way of stopping a topology once all pending // checkpoints have been committed. // To work around that limitation, the persistent kafka consumer is throtteled with a thread.sleep(). - long o1 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0); - long o2 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1); - long o3 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2); - Assert.assertTrue("The offset seems incorrect, got " + o1, o1 > 50L); - Assert.assertTrue("The offset seems incorrect, got " + o2, o2 > 50L); - Assert.assertTrue("The offset seems incorrect, got " + o3, o3 > 50L); - /** Once we have proper shutdown of streaming jobs, enable these tests - Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0)); - Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1)); - Assert.assertEquals("The offset seems incorrect", 99L, PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2));*/ + long o1 = -1, o2 = -1, o3 = -1; + if(TestPersistentKafkaSource.finalOffset[0] > 0) { + o1 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 0); + Assert.assertTrue("The offset seems incorrect, got " + o1, o1 == TestPersistentKafkaSource.finalOffset[0]); + } + if(TestPersistentKafkaSource.finalOffset[1] > 0) { + o2 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 1); + Assert.assertTrue("The offset seems incorrect, got " + o2, o2 == TestPersistentKafkaSource.finalOffset[1]); + } + if(TestPersistentKafkaSource.finalOffset[2] > 0) { + o3 = PersistentKafkaSource.getOffset(zk, standardCC.groupId(), topicName, 2); + Assert.assertTrue("The offset seems incorrect, got " + o3, o3 == TestPersistentKafkaSource.finalOffset[2]); + } + Assert.assertFalse("no offset has been set", TestPersistentKafkaSource.finalOffset[0] == 0 && + TestPersistentKafkaSource.finalOffset[1] == 0 && + TestPersistentKafkaSource.finalOffset[2] == 0); + LOG.info("Got final offsets from zookeeper o1={}, o2={}, o3={}", o1, o2, o3); LOG.info("Manipulating offsets"); - // set the offset to 25, 50, and 75 for the three partitions + // set the offset to 50 for the three partitions PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 0, 50); PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 1, 50); PersistentKafkaSource.setOffset(zk, standardCC.groupId(), topicName, 2, 50); @@ -283,20 +378,16 @@ public class KafkaITCase { private void readSequence(StreamExecutionEnvironment env, ConsumerConfig cc, final String topicName, final int valuesStartFrom, final int valuesCount, final int finalCount) throws Exception { LOG.info("Reading sequence for verification until final count {}", finalCount); - DataStream<Tuple2<Integer, Integer>> source = env.addSource( - new PersistentKafkaSource<Tuple2<Integer, Integer>>(topicName, new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1,1), env.getConfig()), cc) - ) - //add a sleeper mapper. Since there is no good way of "shutting down" a running topology, we have - // to play this trick. The problem is that we have to wait until all checkpoints are confirmed - .map(new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { - private static final long serialVersionUID = 1L; - - @Override - public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception { - Thread.sleep(150); - return value; - } - }).setParallelism(3); + TestPersistentKafkaSource<Tuple2<Integer, Integer>> pks = new TestPersistentKafkaSource<Tuple2<Integer, Integer>>(topicName, new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(new Tuple2<Integer, Integer>(1, 1), env.getConfig()), cc); + DataStream<Tuple2<Integer, Integer>> source = env.addSource(pks).map(new MapFunction<Tuple2<Integer, Integer>, Tuple2<Integer, Integer>>() { + // we need to slow down the source so that it can participate in a few checkpoints. + // Otherwise it would write its data into buffers and shut down. + @Override + public Tuple2<Integer, Integer> map(Tuple2<Integer, Integer> value) throws Exception { + Thread.sleep(50); + return value; + } + }); // verify data DataStream<Integer> validIndexes = source.flatMap(new RichFlatMapFunction<Tuple2<Integer, Integer>, Integer>() { @@ -312,7 +403,6 @@ public class KafkaITCase { LOG.info("Reader " + getRuntimeContext().getIndexOfThisSubtask() + " got " + value + " count=" + count + "/" + finalCount); // verify if we've seen everything - if (count == finalCount) { LOG.info("Received all values"); for (int i = 0; i < values.length; i++) { http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/UtilsTest.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/UtilsTest.java b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/UtilsTest.java new file mode 100644 index 0000000..32bd1d1 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-connectors/flink-connector-kafka/src/test/java/org/apache/flink/streaming/connectors/kafka/util/UtilsTest.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.connectors.kafka.util; + +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.connectors.kafka.Utils; +import org.junit.Assert; +import org.junit.Test; + +public class UtilsTest { + + /** + * Ensure that the returned byte array has the expected size + */ + @Test + public void testTypeInformationSerializationSchema() { + final ExecutionConfig ec = new ExecutionConfig(); + + Tuple2<Integer, Integer> test = new Tuple2<Integer, Integer>(1,666); + + Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>> ser = new Utils.TypeInformationSerializationSchema<Tuple2<Integer, Integer>>(test, ec); + + byte[] res = ser.serialize(test); + Assert.assertEquals(8, res.length); + + Tuple2<Integer, Integer> another = ser.deserialize(res); + Assert.assertEquals(test.f0, another.f0); + Assert.assertEquals(test.f1, another.f1); + } + + @Test + public void testGrowing() { + final ExecutionConfig ec = new ExecutionConfig(); + + Tuple2<Integer, byte[]> test1 = new Tuple2<Integer, byte[]>(1, new byte[16]); + + Utils.TypeInformationSerializationSchema<Tuple2<Integer, byte[]>> ser = new Utils.TypeInformationSerializationSchema<Tuple2<Integer, byte[]>>(test1, ec); + + byte[] res = ser.serialize(test1); + Assert.assertEquals(24, res.length); + Tuple2<Integer, byte[]> another = ser.deserialize(res); + Assert.assertEquals(16, another.f1.length); + + test1 = new Tuple2<Integer, byte[]>(1, new byte[26]); + + res = ser.serialize(test1); + Assert.assertEquals(34, res.length); + another = ser.deserialize(res); + Assert.assertEquals(26, another.f1.length); + + test1 = new Tuple2<Integer, byte[]>(1, new byte[1]); + + res = ser.serialize(test1); + Assert.assertEquals(9, res.length); + another = ser.deserialize(res); + Assert.assertEquals(1, another.f1.length); + } + + +} http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java deleted file mode 100644 index 306df71..0000000 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointCommitter.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api.checkpoint; - -import java.io.Serializable; - -import org.apache.flink.runtime.state.StateHandle; - -/** - * This interface must be implemented by functions/operations that want to receive - * a commit notification once a checkpoint has been completely acknowledged by all - * participants. - */ -public interface CheckpointCommitter { - - /** - * This method is called as a notification once a distributed checkpoint has been completed. - * - * Note that any exception during this method will not cause the checkpoint to - * fail any more. - * - * @param checkpointId The ID of the checkpoint that has been completed. - * @param stateName The name of the committed state - * @param checkPointedState Handle to the state that was checkpointed with this checkpoint id. - * @throws Exception - */ - void commitCheckpoint(long checkpointId, String stateName, StateHandle<Serializable> checkPointedState) throws Exception; -} http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointNotifier.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointNotifier.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointNotifier.java new file mode 100644 index 0000000..c2d2182 --- /dev/null +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/checkpoint/CheckpointNotifier.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.streaming.api.checkpoint; + +/** + * This interface must be implemented by functions/operations that want to receive + * a commit notification once a checkpoint has been completely acknowledged by all + * participants. + */ +public interface CheckpointNotifier { + + /** + * This method is called as a notification once a distributed checkpoint has been completed. + * + * Note that any exception during this method will not cause the checkpoint to + * fail any more. + * + * @param checkpointId The ID of the checkpoint that has been completed. + * @throws Exception + */ + void notifyCheckpointComplete(long checkpointId) throws Exception; +} http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 4d541bc..c988150 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -394,15 +394,16 @@ public class StreamingJobGraphGenerator { List<JobVertexID> ackVertices = new ArrayList<JobVertexID>(jobVertices.size()); // collect the vertices that receive "commit checkpoint" messages - // currently, these are only the sources + // currently, these are all certices List<JobVertexID> commitVertices = new ArrayList<JobVertexID>(); for (JobVertex vertex : jobVertices.values()) { if (vertex.isInputVertex()) { triggerVertices.add(vertex.getID()); - commitVertices.add(vertex.getID()); } + // TODO: add check whether the user function implements the checkpointing interface + commitVertices.add(vertex.getID()); ackVertices.add(vertex.getID()); } http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java index c128a7b..b2d9c91 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperator.java @@ -30,7 +30,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.PartitionedStateHandle; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StateHandleProvider; -import org.apache.flink.streaming.api.checkpoint.CheckpointCommitter; +import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.state.StreamOperatorState; import org.apache.flink.streaming.runtime.tasks.StreamingRuntimeContext; @@ -134,11 +134,10 @@ public abstract class AbstractUdfStreamOperator<OUT, F extends Function & Serial } - public void confirmCheckpointCompleted(long checkpointId, String stateName, - StateHandle<Serializable> checkpointedState) throws Exception { - if (userFunction instanceof CheckpointCommitter) { + public void notifyCheckpointComplete(long checkpointId) throws Exception { + if (userFunction instanceof CheckpointNotifier) { try { - ((CheckpointCommitter) userFunction).commitCheckpoint(checkpointId, stateName, checkpointedState); + ((CheckpointNotifier) userFunction).notifyCheckpointComplete(checkpointId); } catch (Exception e) { throw new Exception("Error while confirming checkpoint " + checkpointId + " to the stream function", e); } http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java index 6b5a3e8..afc36e0 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/api/operators/StatefulStreamOperator.java @@ -36,5 +36,5 @@ public interface StatefulStreamOperator<OUT> extends StreamOperator<OUT> { Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>> getStateSnapshotFromFunction(long checkpointId, long timestamp) throws Exception; - void confirmCheckpointCompleted(long checkpointId, String stateName, StateHandle<Serializable> checkpointedState) throws Exception; + void notifyCheckpointComplete(long checkpointId) throws Exception; } http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index 7421a33..f98ed2d 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-staging/flink-streaming/flink-streaming-core/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -22,7 +22,6 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.Map.Entry; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.collections.functors.NotNullPredicate; @@ -33,7 +32,7 @@ import org.apache.flink.configuration.GlobalConfiguration; import org.apache.flink.runtime.event.task.TaskEvent; import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; -import org.apache.flink.runtime.jobgraph.tasks.CheckpointCommittingOperator; +import org.apache.flink.runtime.jobgraph.tasks.CheckpointNotificationOperator; import org.apache.flink.runtime.jobgraph.tasks.CheckpointedOperator; import org.apache.flink.runtime.jobgraph.tasks.OperatorStateCarrier; import org.apache.flink.runtime.state.FileStateHandle; @@ -41,7 +40,6 @@ import org.apache.flink.runtime.state.LocalStateHandle; import org.apache.flink.runtime.state.PartitionedStateHandle; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StateHandleProvider; -import org.apache.flink.runtime.util.SerializedValue; import org.apache.flink.runtime.util.event.EventListener; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.StatefulStreamOperator; @@ -52,7 +50,7 @@ import org.slf4j.LoggerFactory; public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends AbstractInvokable implements - OperatorStateCarrier<StateHandle<Serializable>>, CheckpointedOperator, CheckpointCommittingOperator { + OperatorStateCarrier<StateHandle<Serializable>>, CheckpointedOperator, CheckpointNotificationOperator { private static final Logger LOG = LoggerFactory.getLogger(StreamTask.class); @@ -69,11 +67,11 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs protected volatile boolean isRunning = false; protected List<StreamingRuntimeContext> contexts; - + protected StreamingRuntimeContext headContext; protected ClassLoader userClassLoader; - + private EventListener<TaskEvent> superstepListener; public StreamTask() { @@ -86,11 +84,11 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs public void registerInputOutput() { this.userClassLoader = getUserCodeClassLoader(); this.configuration = new StreamConfig(getTaskConfiguration()); - + streamOperator = configuration.getStreamOperator(userClassLoader); outputHandler = new OutputHandler<OUT>(this); - + if (streamOperator != null) { // IterationHead and IterationTail don't have an Operator... @@ -110,13 +108,13 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs public StreamingRuntimeContext createRuntimeContext(StreamConfig conf) { Environment env = getEnvironment(); String operatorName = conf.getStreamOperator(userClassLoader).getClass().getSimpleName(); - + KeySelector<?,Serializable> statePartitioner = conf.getStatePartitioner(userClassLoader); - + return new StreamingRuntimeContext(operatorName, env, getUserCodeClassLoader(), getExecutionConfig(), statePartitioner, getStateHandleProvider()); } - + private StateHandleProvider<Serializable> getStateHandleProvider() { StateHandleProvider<Serializable> provider = configuration @@ -134,7 +132,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs } catch (Exception e) { throw new RuntimeException(backendName + " is not a valid state backend.\nSupported backends: jobmanager, filesystem."); } - + switch (backend) { case JOBMANAGER: LOG.info("State backend for state checkpoints is set to jobmanager."); @@ -152,7 +150,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs default: throw new RuntimeException("Backend " + backend + " is not supported yet."); } - + } else { LOG.info("Using user defined state backend for streaming checkpoitns."); return provider; @@ -199,7 +197,7 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs @SuppressWarnings("unchecked") @Override public void setInitialState(StateHandle<Serializable> stateHandle) throws Exception { - + // We retrieve end restore the states for the chained oeprators. List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>> chainedStates = (List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>>) stateHandle.getState(); @@ -217,15 +215,15 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs @Override public void triggerCheckpoint(long checkpointId, long timestamp) throws Exception { - + synchronized (checkpointLock) { if (isRunning) { try { LOG.debug("Starting checkpoint {} on task {}", checkpointId, getName()); - + // We wrap the states of the chained operators in a list, marking non-stateful oeprators with null List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>> chainedStates = new ArrayList<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>>(); - + // A wrapper handle is created for the List of statehandles WrapperStateHandle stateHandle; try { @@ -240,17 +238,17 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs chainedStates.add(null); } } - + stateHandle = CollectionUtils.exists(chainedStates, NotNullPredicate.INSTANCE) ? new WrapperStateHandle(chainedStates) : null; } catch (Exception e) { throw new Exception("Error while drawing snapshot of the user state.", e); } - + // now emit the checkpoint barriers outputHandler.broadcastBarrier(checkpointId, timestamp); - + // now confirm the checkpoint if (stateHandle == null) { getEnvironment().acknowledgeCheckpoint(checkpointId); @@ -270,44 +268,24 @@ public abstract class StreamTask<OUT, O extends StreamOperator<OUT>> extends Abs @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - public void confirmCheckpoint(long checkpointId, SerializedValue<StateHandle<?>> stateHandle) throws Exception { + public void notifyCheckpointComplete(long checkpointId) throws Exception { + // we do nothing here so far. this should call commit on the source function, for example synchronized (checkpointLock) { - if (stateHandle != null) { - List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>> chainedStates = (List<Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>>>) stateHandle - .deserializeValue(getUserCodeClassLoader()).getState(); - - for (int i = 0; i < chainedStates.size(); i++) { - Tuple2<StateHandle<Serializable>, Map<String, PartitionedStateHandle>> chainedState = chainedStates - .get(i); - StreamOperator<?> chainedOperator = outputHandler.getChainedOperators().get(i); - - if (chainedState != null) { - if (chainedState.f0 != null) { - ((StatefulStreamOperator) chainedOperator).confirmCheckpointCompleted( - checkpointId, null, chainedState.f0); - } + if (streamOperator instanceof StatefulStreamOperator) { + ((StatefulStreamOperator) streamOperator).notifyCheckpointComplete(checkpointId); + } - if (chainedState.f1 != null) { - if (chainedOperator instanceof StatefulStreamOperator) { - for (Entry<String, PartitionedStateHandle> stateEntry : chainedState.f1 - .entrySet()) { - for (StateHandle<Serializable> handle : stateEntry.getValue() - .getState().values()) { - ((StatefulStreamOperator) chainedOperator) - .confirmCheckpointCompleted(checkpointId, - stateEntry.getKey(), handle); - } - } - } - } + if (hasChainedOperators) { + for (StreamOperator<?> chainedOperator : outputHandler.getChainedOperators()) { + if (chainedOperator instanceof StatefulStreamOperator) { + ((StatefulStreamOperator) chainedOperator).notifyCheckpointComplete(checkpointId); } } } - } } - - + + // ------------------------------------------------------------------------ // Utilities // ------------------------------------------------------------------------ http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties ---------------------------------------------------------------------- diff --git a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties b/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties index 0b686e5..fcee1b5 100644 --- a/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties +++ b/flink-staging/flink-streaming/flink-streaming-core/src/test/resources/log4j-test.properties @@ -17,7 +17,7 @@ ################################################################################ # Set root logger level to DEBUG and its only appender to A1. -log4j.rootLogger=OFF, A1 +log4j.rootLogger=INFO, A1 # A1 is set to be a ConsoleAppender. log4j.appender.A1=org.apache.log4j.ConsoleAppender http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java ---------------------------------------------------------------------- diff --git a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java index a826eff..1730c63 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/StreamCheckpointingITCase.java @@ -23,26 +23,35 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; +import java.io.Serializable; import java.util.HashMap; import java.util.Map; import java.util.Random; import org.apache.flink.api.common.functions.RichFilterFunction; +import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.functions.RichMapFunction; import org.apache.flink.api.common.functions.RichReduceFunction; import org.apache.flink.api.common.state.OperatorState; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.state.StateHandle; +import org.apache.flink.streaming.api.checkpoint.CheckpointNotifier; import org.apache.flink.streaming.api.checkpoint.Checkpointed; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import org.apache.flink.test.util.ForkableFlinkMiniCluster; +import org.apache.flink.util.Collector; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * A simple test that runs a streaming topology with checkpointing enabled. @@ -87,9 +96,8 @@ public class StreamCheckpointingITCase { fail("Failed to stop test cluster: " + e.getMessage()); } } - - - + + /** * Runs the following program: * http://git-wip-us.apache.org/repos/asf/flink/blob/aa5e5b30/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java ---------------------------------------------------------------------- diff --git a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java index 43285b5..23b8940 100644 --- a/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java +++ b/flink-yarn-tests/src/main/java/org/apache/flink/yarn/YarnTestBase.java @@ -467,7 +467,11 @@ public abstract class YarnTestBase { // check if thread died if(!runner.isAlive()) { sendOutput(); - Assert.fail("Runner thread died before the test was finished. Return value = " +runner.getReturnValue()); + if(runner.getReturnValue() != 0) { + Assert.fail("Runner thread died before the test was finished. Return value = " + runner.getReturnValue()); + } else { + LOG.info("Runner stopped earlier than expected with return value = 0"); + } } } sendOutput();
