[FLINK-1840] Fix BLOB manager on Windows This closes #578
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0afed4dc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0afed4dc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0afed4dc Branch: refs/heads/master Commit: 0afed4dcf9f9639fc90a8f18933be0516b9d7cd3 Parents: cc7eda1 Author: Fabian Hueske <fhue...@apache.org> Authored: Tue Apr 7 23:04:59 2015 +0200 Committer: Fabian Hueske <fhue...@apache.org> Committed: Wed Apr 8 09:52:15 2015 +0200 ---------------------------------------------------------------------- .../flink/runtime/blob/BlobServerConnection.java | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0afed4dc/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java index 50b1f24..4f242a2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServerConnection.java @@ -30,6 +30,7 @@ import java.net.SocketException; import java.security.MessageDigest; import org.apache.flink.api.common.JobID; +import org.apache.flink.shaded.com.google.common.io.Files; import org.apache.flink.util.InstantiationUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -307,24 +308,18 @@ class BlobServerConnection extends Thread { md.update(buf, 0, bytesExpected); } } - + fos.close(); if (contentAddressable == NAME_ADDRESSABLE) { File storageFile = this.blobServer.getStorageLocation(jobID, key); - if (!incomingFile.renameTo(storageFile)) { - throw new IOException(String.format("Cannot move staging file %s to BLOB file %s", - incomingFile.getAbsolutePath(), storageFile.getAbsolutePath())); - } + Files.move(incomingFile, storageFile); incomingFile = null; outputStream.write(RETURN_OKAY); } else { BlobKey blobKey = new BlobKey(md.digest()); File storageFile = blobServer.getStorageLocation(blobKey); - if (!incomingFile.renameTo(storageFile)) { - throw new IOException(String.format("Cannot move staging file %s to BLOB file %s", - incomingFile.getAbsolutePath(), storageFile.getAbsolutePath())); - } + Files.move(incomingFile, storageFile); incomingFile = null; // Return computed key to client for validation