Repository: flink Updated Branches: refs/heads/master 4341c8a35 -> 2ba5f8733
[hotfix] Backwards compatible deserialization of RocksDB backend UUIDs Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2ba5f873 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2ba5f873 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2ba5f873 Branch: refs/heads/master Commit: 2ba5f8733694a4e52a20b12d949b8611026065d2 Parents: 4341c8a Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Thu Jul 20 11:03:18 2017 +0200 Committer: Stefan Richter <s.rich...@data-artisans.com> Committed: Thu Jul 20 14:18:36 2017 +0200 ---------------------------------------------------------------------- .../savepoint/SavepointV2Serializer.java | 12 +++++++++++- .../runtime/testingUtils/TestingCluster.scala | 19 +++---------------- 2 files changed, 14 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/2ba5f873/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java index c8d695f..4cbbfcf 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/savepoint/SavepointV2Serializer.java @@ -39,6 +39,7 @@ import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -391,8 +392,17 @@ class SavepointV2Serializer implements SavepointSerializer<SavepointV2> { Map<StateHandleID, StreamStateHandle> sharedStates = deserializeStreamStateHandleMap(dis); Map<StateHandleID, StreamStateHandle> privateStates = deserializeStreamStateHandleMap(dis); + UUID uuid; + + try { + uuid = UUID.fromString(backendId); + } catch (Exception ex) { + // compatibility with old format pre FLINK-6964: + uuid = UUID.nameUUIDFromBytes(backendId.getBytes(StandardCharsets.UTF_8)); + } + return new IncrementalKeyedStateHandle( - UUID.fromString(backendId), + uuid, keyGroupRange, checkpointId, sharedStates, http://git-wip-us.apache.org/repos/asf/flink/blob/2ba5f873/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala index 48c4d85..833cb61 100644 --- a/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala +++ b/flink-runtime/src/test/scala/org/apache/flink/runtime/testingUtils/TestingCluster.scala @@ -378,12 +378,6 @@ class TestingCluster( def requestCheckpoint(jobId: JobID, options : CheckpointOptions): String = { val jobManagerGateway = getLeaderGateway(timeout) - // wait until the cluster is ready to take a checkpoint. - val allRunning = jobManagerGateway.ask( - TestingJobManagerMessages.WaitForAllVerticesToBeRunning(jobId), timeout) - - Await.ready(allRunning, timeout) - // trigger checkpoint val result = Await.result( jobManagerGateway.ask(CheckpointRequest(jobId, options), timeout), timeout) @@ -395,16 +389,9 @@ class TestingCluster( // failed because tasks were not ready.This would not be required if // TestingJobManagerMessages.WaitForAllVerticesToBeRunning(...) works // properly. - if (fail.cause != null) { - val innerCause = fail.cause.getCause - if (innerCause != null - && innerCause.getMessage.contains("tasks not ready")) { - // retry if the tasks are not ready yet. - Thread.sleep(50) - return requestCheckpoint(jobId, options) - } - } - throw new IOException(fail.cause) + LOG.info("Test checkpoint attempt failed. Retry ...", fail.cause) + Thread.sleep(50) + requestCheckpoint(jobId, options) } case _ => throw new IllegalStateException("Trigger checkpoint failed") }