[FLINK-1578] [BLOB manager] Improve failure handling and add more failure tests.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cfce493f Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cfce493f Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cfce493f Branch: refs/heads/master Commit: cfce493feb70a49d2722dc2a0d79f845f7e0461a Parents: 47fed3d Author: Stephan Ewen <se...@apache.org> Authored: Wed Feb 18 12:16:35 2015 +0100 Committer: Stephan Ewen <se...@apache.org> Committed: Wed Feb 18 13:59:00 2015 +0100 ---------------------------------------------------------------------- .../flink/configuration/ConfigConstants.java | 55 +- .../org/apache/flink/runtime/AbstractID.java | 93 ++- .../apache/flink/runtime/blob/BlobCache.java | 164 +++-- .../apache/flink/runtime/blob/BlobClient.java | 700 ++++++++++++------- .../flink/runtime/blob/BlobConnection.java | 337 --------- .../flink/runtime/blob/BlobInputStream.java | 10 +- .../org/apache/flink/runtime/blob/BlobKey.java | 3 +- .../apache/flink/runtime/blob/BlobServer.java | 261 +++---- .../runtime/blob/BlobServerConnection.java | 466 ++++++++++++ .../flink/runtime/blob/BlobServerProtocol.java | 59 ++ .../apache/flink/runtime/blob/BlobService.java | 19 +- .../apache/flink/runtime/blob/BlobUtils.java | 122 +++- .../apache/flink/runtime/AbstractIDTest.java | 42 ++ .../runtime/blob/BlobCacheRetriesTest.java | 150 ++++ .../runtime/blob/BlobCacheSuccessTest.java | 121 ++++ .../flink/runtime/blob/BlobCacheTest.java | 121 ---- .../flink/runtime/blob/BlobClientTest.java | 169 +++-- .../runtime/blob/BlobServerDeleteTest.java | 323 +++++++++ .../flink/runtime/blob/BlobServerGetTest.java | 149 ++++ .../flink/runtime/blob/BlobServerPutTest.java | 402 +++++++++++ .../runtime/blob/TestingFailingBlobServer.java | 74 ++ .../BlobLibraryCacheManagerTest.java | 2 +- 22 files changed, 2755 insertions(+), 1087 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java ---------------------------------------------------------------------- diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java index a0bf365..42a3c9a 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigConstants.java @@ -42,17 +42,6 @@ public final class ConfigConstants { public static final String DEFAULT_EXECUTION_RETRIES_KEY = "execution-retries.default"; // -------------------------------- Runtime ------------------------------- - - /** - * The config parameter defining the storage directory to be used by the blob server. - */ - public static final String BLOB_STORAGE_DIRECTORY_KEY = "blob.storage.directory"; - - /** - * The config parameter defining the cleanup interval of the library cache manager. - */ - public static final String LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = "library-cache-manager" + - ".cleanup.interval"; /** * The config parameter defining the network address to connect to @@ -71,7 +60,32 @@ public final class ConfigConstants { * marked as failed. */ public static final String JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT_KEY = "jobmanager.max-heartbeat-delay-before-failure.msecs"; - + + /** + * The config parameter defining the storage directory to be used by the blob server. + */ + public static final String BLOB_STORAGE_DIRECTORY_KEY = "blob.storage.directory"; + + /** + * The config parameter defining number of retires for failed BLOB fetches. + */ + public static final String BLOB_FETCH_RETRIES_KEY = "blob.fetch.retries"; + + /** + * The config parameter defining the maximum number of concurrent BLOB fetches that the JobManager serves. + */ + public static final String BLOB_FETCH_CONCURRENT_KEY = "blob.fetch.num-concurrent"; + + /** + * The config parameter defining the backlog of BLOB fetches on the JobManager + */ + public static final String BLOB_FETCH_BACKLOG_KEY = "blob.fetch.backlog"; + + /** + * The config parameter defining the cleanup interval of the library cache manager. + */ + public static final String LIBRARY_CACHE_MANAGER_CLEANUP_INTERVAL = "library-cache-manager.cleanup.interval"; + /** * The config parameter defining the task manager's IPC port from the configuration. */ @@ -405,7 +419,22 @@ public final class ConfigConstants { */ // 30 seconds (its enough to get to mars, should be enough to detect failure) public static final int DEFAULT_JOB_MANAGER_DEAD_TASKMANAGER_TIMEOUT = 30*1000; - + + /** + * Default number of retries for failed BLOB fetches. + */ + public static final int DEFAULT_BLOB_FETCH_RETRIES = 5; + + /** + * Default number of concurrent BLOB fetch operations. + */ + public static final int DEFAULT_BLOB_FETCH_CONCURRENT = 50; + + /** + * Default BLOB fetch connection backlog. + */ + public static final int DEFAULT_BLOB_FETCH_BACKLOG = 1000; + /** * The default network port the task manager expects incoming IPC connections. The {@code 0} means that * the TaskManager searches for a free port. http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java index 130e3eb..247a052 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/AbstractID.java @@ -16,11 +16,9 @@ * limitations under the License. */ - package org.apache.flink.runtime; import java.io.IOException; -import java.nio.ByteBuffer; import java.util.Random; import org.apache.flink.core.io.IOReadableWritable; @@ -103,48 +101,39 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>, j } // -------------------------------------------------------------------------------------------- - + + /** + * Gets the lower 64 bits of the ID. + * + * @return The lower 64 bits of the ID. + */ public long getLowerPart() { return lowerPart; } - - public long getUpperPart() { - return upperPart; - } - - // -------------------------------------------------------------------------------------------- /** - * Converts the given byte array to a long. + * Gets the upper 64 bits of the ID. * - * @param ba the byte array to be converted - * @param offset the offset indicating at which byte inside the array the conversion shall begin - * @return the long variable + * @return The upper 64 bits of the ID. */ - private static long byteArrayToLong(byte[] ba, int offset) { - long l = 0; - - for (int i = 0; i < SIZE_OF_LONG; ++i) { - l |= (ba[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << (i << 3); - } - - return l; + public long getUpperPart() { + return upperPart; } /** - * Converts a long to a byte array. + * Gets the bytes underlying this ID. * - * @param l the long variable to be converted - * @param ba the byte array to store the result the of the conversion - * @param offset offset indicating at what position inside the byte array the result of the conversion shall be stored + * @return The bytes underlying this ID. */ - private static void longToByteArray(final long l, final byte[] ba, final int offset) { - for (int i = 0; i < SIZE_OF_LONG; ++i) { - final int shift = i << 3; // i * 8 - ba[offset + SIZE_OF_LONG - 1 - i] = (byte) ((l & (0xffL << shift)) >>> shift); - } + public byte[] getBytes() { + byte[] bytes = new byte[SIZE]; + longToByteArray(lowerPart, bytes, 0); + longToByteArray(upperPart, bytes, SIZE_OF_LONG); + return bytes; } - + + // -------------------------------------------------------------------------------------------- + // Serialization // -------------------------------------------------------------------------------------------- @Override @@ -159,17 +148,14 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>, j out.writeLong(this.upperPart); } - public void write(ByteBuffer buffer) { - buffer.putLong(this.lowerPart); - buffer.putLong(this.upperPart); - } - public void writeTo(ByteBuf buf) { buf.writeLong(this.lowerPart); buf.writeLong(this.upperPart); } // -------------------------------------------------------------------------------------------- + // Standard Utilities + // -------------------------------------------------------------------------------------------- @Override public boolean equals(Object obj) { @@ -203,4 +189,39 @@ public class AbstractID implements IOReadableWritable, Comparable<AbstractID>, j int diff2 = (this.lowerPart < o.lowerPart) ? -1 : ((this.lowerPart == o.lowerPart) ? 0 : 1); return diff1 == 0 ? diff2 : diff1; } + + // -------------------------------------------------------------------------------------------- + // Conversion Utilities + // -------------------------------------------------------------------------------------------- + + /** + * Converts the given byte array to a long. + * + * @param ba the byte array to be converted + * @param offset the offset indicating at which byte inside the array the conversion shall begin + * @return the long variable + */ + private static long byteArrayToLong(byte[] ba, int offset) { + long l = 0; + + for (int i = 0; i < SIZE_OF_LONG; ++i) { + l |= (ba[offset + SIZE_OF_LONG - 1 - i] & 0xffL) << (i << 3); + } + + return l; + } + + /** + * Converts a long to a byte array. + * + * @param l the long variable to be converted + * @param ba the byte array to store the result the of the conversion + * @param offset offset indicating at what position inside the byte array the result of the conversion shall be stored + */ + private static void longToByteArray(long l, byte[] ba, int offset) { + for (int i = 0; i < SIZE_OF_LONG; ++i) { + final int shift = i << 3; // i * 8 + ba[offset + SIZE_OF_LONG - 1 - i] = (byte) ((l & (0xffL << shift)) >>> shift); + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java index 40ec4e3..0d1b29c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobCache.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.Closeable; import java.io.File; import java.io.FileOutputStream; import java.io.IOException; @@ -40,9 +41,7 @@ import java.util.concurrent.atomic.AtomicBoolean; */ public final class BlobCache implements BlobService { - /** - * The log object used for debugging. - */ + /** The log object used for debugging. */ private static final Logger LOG = LoggerFactory.getLogger(BlobCache.class); private final InetSocketAddress serverAddress; @@ -54,6 +53,9 @@ public final class BlobCache implements BlobService { /** Shutdown hook thread to ensure deletion of the storage directory. */ private final Thread shutdownHook; + /** The number of retries when the transfer fails */ + private final int numFetchRetries; + public BlobCache(InetSocketAddress serverAddress, Configuration configuration) { if (serverAddress == null || configuration == null) { @@ -62,80 +64,122 @@ public final class BlobCache implements BlobService { this.serverAddress = serverAddress; + // configure and create the storage directory String storageDirectory = configuration.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null); this.storageDir = BlobUtils.initStorageDirectory(storageDirectory); LOG.info("Created BLOB cache storage directory " + storageDir); + // configure the number of fetch retries + final int fetchRetries = configuration.getInteger( + ConfigConstants.BLOB_FETCH_RETRIES_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_RETRIES); + if (fetchRetries >= 0) { + this.numFetchRetries = fetchRetries; + } + else { + LOG.warn("Invalid value for {}. System will attempt no retires on failed fetches of BLOBs.", + ConfigConstants.BLOB_FETCH_RETRIES_KEY); + this.numFetchRetries = 0; + } + // Add shutdown hook to delete storage directory shutdownHook = BlobUtils.addShutdownHook(this, LOG); } /** - * Returns the URL for the content-addressable BLOB with the given key. The method will first attempt to serve - * the BLOB from its local cache. If one or more BLOB are not in the cache, the method will try to download them - * from the BLOB server with the given address. + * Returns the URL for the BLOB with the given key. The method will first attempt to serve + * the BLOB from its local cache. If the BLOB is not in the cache, the method will try to download it + * from this cache's BLOB server. * - * @param requiredBlob - * the key of the desired content-addressable BLOB - * @return URL referring to the local storage location of the BLOB - * @throws IOException - * thrown if an I/O error occurs while downloading the BLOBs from the BLOB server + * @param requiredBlob The key of the desired BLOB. + * @return URL referring to the local storage location of the BLOB. + * @throws IOException Thrown if an I/O error occurs while downloading the BLOBs from the BLOB server. */ public URL getURL(final BlobKey requiredBlob) throws IOException { if (requiredBlob == null) { - throw new IllegalArgumentException("Required BLOB cannot be null."); + throw new IllegalArgumentException("BLOB key cannot be null."); } - BlobClient bc = null; - byte[] buf = null; - URL url = null; + final File localJarFile = BlobUtils.getStorageLocation(storageDir, requiredBlob); - try { - final File localJarFile = BlobUtils.getStorageLocation(storageDir, requiredBlob); + if (!localJarFile.exists()) { - if (!localJarFile.exists()) { + final byte[] buf = new byte[BlobServerProtocol.BUFFER_SIZE]; - if (LOG.isDebugEnabled()) { - LOG.debug("Trying to download " + requiredBlob + " from " + serverAddress); - } + // loop over retries + int attempt = 0; + while (true) { - bc = new BlobClient(serverAddress); - buf = new byte[BlobServer.BUFFER_SIZE]; + if (attempt == 0) { + LOG.info("Downloading {} from {}", requiredBlob, serverAddress); + } else { + LOG.info("Downloading {} from {} (retry {})", requiredBlob, serverAddress, attempt); + } - InputStream is = null; - OutputStream os = null; try { - is = bc.get(requiredBlob); - os = new FileOutputStream(localJarFile); + BlobClient bc = null; + InputStream is = null; + OutputStream os = null; + + try { + bc = new BlobClient(serverAddress); + is = bc.get(requiredBlob); + os = new FileOutputStream(localJarFile); + + while (true) { + final int read = is.read(buf); + if (read < 0) { + break; + } + os.write(buf, 0, read); + } - while (true) { + // we do explicitly not use a finally block, because we want the closing + // in the regular case to throw exceptions and cause the writing to fail. + // But, the closing on exception should not throw further exceptions and + // let us keep the root exception + os.close(); + os = null; + is.close(); + is = null; + bc.close(); + bc = null; - final int read = is.read(buf); - if (read < 0) { - break; + // success, we finished + break; + } + catch (Throwable t) { + // we use "catch (Throwable)" to keep the root exception. Otherwise that exception + // it would be replaced by any exception thrown in the finally block + closeSilently(os); + closeSilently(is); + closeSilently(bc); + + if (t instanceof IOException) { + throw (IOException) t; + } else { + throw new IOException(t.getMessage(), t); } - - os.write(buf, 0, read); } - } finally { - if (is != null) { - is.close(); + } + catch (IOException e) { + String message = "Failed to fetch BLOB " + requiredBlob + " from " + serverAddress + '.'; + if (attempt < numFetchRetries) { + attempt++; + if (LOG.isDebugEnabled()) { + LOG.debug(message + " Retrying...", e); + } else { + LOG.error(message + " Retrying..."); + } } - if (os != null) { - os.close(); + else { + LOG.error(message + " No retries left.", e); + throw new IOException(message, e); } } - } - url = localJarFile.toURI().toURL(); - - - } finally { - if (bc != null) { - bc.close(); - } + } // end loop over retries } - return url; + return localJarFile.toURI().toURL(); } /** @@ -145,8 +189,10 @@ public final class BlobCache implements BlobService { public void delete(BlobKey key) throws IOException{ final File localFile = BlobUtils.getStorageLocation(storageDir, key); - if(localFile.exists()) { - localFile.delete(); + if (localFile.exists()) { + if (!localFile.delete()) { + LOG.warn("Failed to delete locally cached BLOB " + key + " at " + localFile.getAbsolutePath()); + } } } @@ -180,4 +226,24 @@ public final class BlobCache implements BlobService { } } } + + public File getStorageDir() { + return this.storageDir; + } + + // ------------------------------------------------------------------------ + // Miscellaneous + // ------------------------------------------------------------------------ + + private void closeSilently(Closeable closeable) { + if (closeable != null) { + try { + closeable.close(); + } catch (Throwable t) { + if (LOG.isDebugEnabled()) { + LOG.debug("Error while closing resource after BLOB transfer.", t); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java index 9a0479f..cb799c4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobClient.java @@ -19,30 +19,45 @@ package org.apache.flink.runtime.blob; import java.io.Closeable; +import java.io.EOFException; import java.io.FileNotFoundException; import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; import java.net.Socket; -import java.nio.ByteBuffer; import java.security.MessageDigest; -import org.apache.flink.runtime.AbstractID; +import org.apache.flink.util.InstantiationUtil; +import org.slf4j.LoggerFactory; +import org.slf4j.Logger; + import org.apache.flink.runtime.jobgraph.JobID; +import static org.apache.flink.runtime.blob.BlobServerProtocol.CONTENT_ADDRESSABLE; +import static org.apache.flink.runtime.blob.BlobServerProtocol.JOB_ID_SCOPE; +import static org.apache.flink.runtime.blob.BlobServerProtocol.NAME_ADDRESSABLE; +import static org.apache.flink.runtime.blob.BlobUtils.readFully; +import static org.apache.flink.runtime.blob.BlobUtils.readLength; +import static org.apache.flink.runtime.blob.BlobUtils.writeLength; +import static org.apache.flink.runtime.blob.BlobServerProtocol.BUFFER_SIZE; +import static org.apache.flink.runtime.blob.BlobServerProtocol.DELETE_OPERATION; +import static org.apache.flink.runtime.blob.BlobServerProtocol.GET_OPERATION; +import static org.apache.flink.runtime.blob.BlobServerProtocol.PUT_OPERATION; +import static org.apache.flink.runtime.blob.BlobServerProtocol.MAX_KEY_LENGTH; +import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_OKAY; +import static org.apache.flink.runtime.blob.BlobServerProtocol.RETURN_ERROR; + /** - * The BLOB client can communicate with the BLOB server and either upload (PUT), download (GET), or delete (DELETE) - * BLOBs. - * <p> - * This class is not thread-safe. + * The BLOB client can communicate with the BLOB server and either upload (PUT), download (GET), + * or delete (DELETE) BLOBs. */ public final class BlobClient implements Closeable { - /** - * The socket connection to the BLOB server. - */ - private Socket socket; + private static final Logger LOG = LoggerFactory.getLogger(BlobClient.class); + + /** The socket connection to the BLOB server. */ + private final Socket socket; /** * Instantiates a new BLOB client. @@ -52,71 +67,177 @@ public final class BlobClient implements Closeable { * @throws IOException * thrown if the connection to the BLOB server could not be established */ - public BlobClient(final InetSocketAddress serverAddress) throws IOException { - + public BlobClient(InetSocketAddress serverAddress) throws IOException { this.socket = new Socket(); try { this.socket.connect(serverAddress); - }catch(IOException e){ + } + catch(IOException e) { + BlobUtils.closeSilently(socket, LOG); throw new IOException("Could not connect to BlobServer at address " + serverAddress, e); } } + @Override + public void close() throws IOException { + this.socket.close(); + } + + public boolean isClosed() { + return this.socket.isClosed(); + } + + // -------------------------------------------------------------------------------------------- + // GET + // -------------------------------------------------------------------------------------------- + /** - * Constructs and writes the header data for a PUT request to the given output stream. + * Downloads the BLOB identified by the given job ID and key from the BLOB server. If no such BLOB exists on the + * server, a {@link FileNotFoundException} is thrown. + * + * @param jobID + * the job ID identifying the BLOB to download + * @param key + * the key identifying the BLOB to download + * @return an input stream to read the retrieved data from + * @throws IOException + * thrown if an I/O error occurs during the download + */ + public InputStream get(JobID jobID, String key) throws IOException { + if (key.length() > MAX_KEY_LENGTH) { + throw new IllegalArgumentException("Keys must not be longer than " + MAX_KEY_LENGTH); + } + + if (this.socket.isClosed()) { + throw new IllegalStateException("BLOB Client is not connected. " + + "Client has been shut down or encountered an error before."); + } + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("GET BLOB %s / \"%s\" from %s", jobID, key, socket.getLocalSocketAddress())); + } + + try { + OutputStream os = this.socket.getOutputStream(); + InputStream is = this.socket.getInputStream(); + + sendGetHeader(os, jobID, key, null); + receiveAndCheckResponse(is); + + return new BlobInputStream(is, null); + } + catch (Throwable t) { + BlobUtils.closeSilently(socket, LOG); + throw new IOException("GET operation failed: " + t.getMessage(), t); + } + } + + /** + * Downloads the BLOB identified by the given BLOB key from the BLOB server. If no such BLOB exists on the server, a + * {@link FileNotFoundException} is thrown. * + * @param blobKey + * the BLOB key identifying the BLOB to download + * @return an input stream to read the retrieved data from + * @throws IOException + * thrown if an I/O error occurs during the download + */ + public InputStream get(BlobKey blobKey) throws IOException { + if (this.socket.isClosed()) { + throw new IllegalStateException("BLOB Client is not connected. " + + "Client has been shut down or encountered an error before."); + } + if (LOG.isDebugEnabled()) { + LOG.debug(String.format("GET content addressable BLOB %s from %s", blobKey, socket.getLocalSocketAddress())); + } + + try { + OutputStream os = this.socket.getOutputStream(); + InputStream is = this.socket.getInputStream(); + + // Send GET header + sendGetHeader(os, null, null, blobKey); + receiveAndCheckResponse(is); + + return new BlobInputStream(is, blobKey); + } + catch (Throwable t) { + BlobUtils.closeSilently(socket, LOG); + throw new IOException("GET operation failed: " + t.getMessage(), t); + } + } + + /** + * Constructs and writes the header data for a GET operation to the given output stream. + * * @param outputStream - * the output stream to write the PUT header data to + * the output stream to write the header data to * @param jobID - * the ID of job the BLOB belongs to or <code>null</code> to indicate the upload of a - * content-addressable BLOB + * the job ID identifying the BLOB to download or <code>null</code> to indicate the BLOB key should be used + * to identify the BLOB on the server instead * @param key - * the key of the BLOB to upload or <code>null</code> to indicate the upload of a content-addressable BLOB - * @param buf - * an auxiliary buffer used for data serialization + * the key identifying the BLOB to download or <code>null</code> to indicate the BLOB key should be used to + * identify the BLOB on the server instead + * @param blobKey + * the BLOB key to identify the BLOB to download if either the job ID or the regular key are + * <code>null</code> * @throws IOException * thrown if an I/O error occurs while writing the header data to the output stream */ - private void sendPutHeader(final OutputStream outputStream, final JobID jobID, final String key, final byte[] buf) - throws IOException { + private void sendGetHeader(OutputStream outputStream, JobID jobID, String key, BlobKey blobKey) throws IOException { // Signal type of operation - outputStream.write(BlobServer.PUT_OPERATION); + outputStream.write(GET_OPERATION); - // Check if PUT should be done in content-addressable manner + // Check if GET should be done in content-addressable manner if (jobID == null || key == null) { - outputStream.write(1); - } else { - outputStream.write(0); - // Send job ID - final ByteBuffer bb = ByteBuffer.wrap(buf); - jobID.write(bb); - outputStream.write(buf); - - // Send the key + outputStream.write(CONTENT_ADDRESSABLE); + blobKey.writeToOutputStream(outputStream); + } + else { + outputStream.write(NAME_ADDRESSABLE); + // Send job ID and key + outputStream.write(jobID.getBytes()); byte[] keyBytes = key.getBytes(BlobUtils.DEFAULT_CHARSET); - BlobServer.writeLength(keyBytes.length, buf, outputStream); + writeLength(keyBytes.length, outputStream); outputStream.write(keyBytes); } } + private void receiveAndCheckResponse(InputStream is) throws IOException { + int response = is.read(); + if (response < 0) { + throw new EOFException("Premature end of response"); + } + if (response == RETURN_ERROR) { + Throwable cause = readExceptionFromStream(is); + throw new IOException("Server side error: " + cause.getMessage(), cause); + } + else if (response != RETURN_OKAY) { + throw new IOException("Unrecognized response"); + } + } + + + // -------------------------------------------------------------------------------------------- + // PUT + // -------------------------------------------------------------------------------------------- + /** * Uploads the data of the given byte array to the BLOB server in a content-addressable manner. - * + * * @param value * the buffer to upload * @return the computed BLOB key identifying the BLOB on the server * @throws IOException * thrown if an I/O error occurs while uploading the data to the BLOB server */ - public BlobKey put(final byte[] value) throws IOException { - + public BlobKey put(byte[] value) throws IOException { return put(value, 0, value.length); } /** * Uploads data from the given byte array to the BLOB server in a content-addressable manner. - * + * * @param value * the buffer to upload data from * @param offset @@ -127,14 +248,13 @@ public final class BlobClient implements Closeable { * @throws IOException * thrown if an I/O error occurs while uploading the data to the BLOB server */ - public BlobKey put(final byte[] value, final int offset, final int len) throws IOException { - + public BlobKey put(byte[] value, int offset, int len) throws IOException { return putBuffer(null, null, value, offset, len); } /** * Uploads the data of the given byte array to the BLOB server and stores it under the given job ID and key. - * + * * @param jobId * the job ID to identify the uploaded data * @param key @@ -144,14 +264,13 @@ public final class BlobClient implements Closeable { * @throws IOException * thrown if an I/O error occurs while uploading the data to the BLOB server */ - public void put(final JobID jobId, final String key, final byte[] value) throws IOException { - + public void put(JobID jobId, String key, byte[] value) throws IOException { put(jobId, key, value, 0, value.length); } /** * Uploads data from the given byte array to the BLOB server and stores it under the given job ID and key. - * + * * @param jobId * the job ID to identify the uploaded data * @param key @@ -165,11 +284,9 @@ public final class BlobClient implements Closeable { * @throws IOException * thrown if an I/O error occurs while uploading the data to the BLOB server */ - public void put(final JobID jobId, final String key, final byte[] value, final int offset, final int len) - throws IOException { - - if (key.length() > BlobServer.MAX_KEY_LENGTH) { - throw new IllegalArgumentException("Keys must not be longer than " + BlobServer.MAX_KEY_LENGTH); + public void put(JobID jobId, String key, byte[] value, int offset, int len) throws IOException { + if (key.length() > MAX_KEY_LENGTH) { + throw new IllegalArgumentException("Keys must not be longer than " + MAX_KEY_LENGTH); } putBuffer(jobId, key, value, offset, len); @@ -177,7 +294,7 @@ public final class BlobClient implements Closeable { /** * Uploads data from the given input stream to the BLOB server and stores it under the given job ID and key. - * + * * @param jobId * the job ID to identify the uploaded data * @param key @@ -188,10 +305,9 @@ public final class BlobClient implements Closeable { * thrown if an I/O error occurs while reading the data from the input stream or uploading the data to the * BLOB server */ - public void put(final JobID jobId, final String key, final InputStream inputStream) throws IOException { - - if (key.length() > BlobServer.MAX_KEY_LENGTH) { - throw new IllegalArgumentException("Keys must not be longer than " + BlobServer.MAX_KEY_LENGTH); + public void put(JobID jobId, String key, InputStream inputStream) throws IOException { + if (key.length() > MAX_KEY_LENGTH) { + throw new IllegalArgumentException("Keys must not be longer than " + MAX_KEY_LENGTH); } putInputStream(jobId, key, inputStream); @@ -199,7 +315,7 @@ public final class BlobClient implements Closeable { /** * Uploads the data from the given input stream to the BLOB server in a content-addressable manner. - * + * * @param inputStream * the input stream to read the data from * @return the computed BLOB key identifying the BLOB on the server @@ -207,93 +323,13 @@ public final class BlobClient implements Closeable { * thrown if an I/O error occurs while reading the data from the input stream or uploading the data to the * BLOB server */ - public BlobKey put(final InputStream inputStream) throws IOException { - + public BlobKey put(InputStream inputStream) throws IOException { return putInputStream(null, null, inputStream); } /** - * Deletes the BLOB identified by the given job ID and key from the BLOB server. - * - * @param jobId - * the job ID to identify the BLOB - * @param key - * the key to identify the BLOB - * @throws IOException - * thrown if an I/O error occurs while transferring the request to the BLOB server - */ - public void delete(final JobID jobId, final String key) throws IOException { - - if (jobId == null) { - throw new IllegalArgumentException("Argument jobID must not be null"); - } - - if (key == null) { - throw new IllegalArgumentException("Argument key must not be null"); - } - - if (key.length() > BlobServer.MAX_KEY_LENGTH) { - throw new IllegalArgumentException("Keys must not be longer than " + BlobServer.MAX_KEY_LENGTH); - } - - deleteInternal(jobId, key); - } - - /** - * Deletes all BLOBs belonging to the job with the given ID from the BLOB server - * - * @param jobId - * the job ID to identify the BLOBs to be deleted - * @throws IOException - * thrown if an I/O error occurs while transferring the request to the BLOB server - */ - public void deleteAll(final JobID jobId) throws IOException { - - if (jobId == null) { - throw new IllegalArgumentException("Argument jobID must not be null"); - } - - deleteInternal(jobId, null); - } - - /** - * Delete one or multiple BLOBs from the BLOB server. - * - * @param jobId - * the job ID to identify the BLOB(s) to be deleted - * @param key - * the key to identify the specific BLOB to delete or <code>null</code> to delete all BLOBs associated with - * the job - * @throws IOException - * thrown if an I/O error occurs while transferring the request to the BLOB server - */ - private void deleteInternal(final JobID jobId, final String key) throws IOException { - - final OutputStream os = this.socket.getOutputStream(); - final byte[] buf = new byte[AbstractID.SIZE]; - - // Signal type of operation - os.write(BlobServer.DELETE_OPERATION); - - // Send job ID - final ByteBuffer bb = ByteBuffer.wrap(buf); - jobId.write(bb); - os.write(buf); - - if (key == null) { - os.write(0); - } else { - os.write(1); - // Send the key - byte[] keyBytes = key.getBytes(BlobUtils.DEFAULT_CHARSET); - BlobServer.writeLength(keyBytes.length, buf, os); - os.write(keyBytes); - } - } - - /** * Uploads data from the given byte buffer to the BLOB server. - * + * * @param jobId * the ID of the job the BLOB belongs to or <code>null</code> to store the BLOB in a content-addressable * manner @@ -311,56 +347,62 @@ public final class BlobClient implements Closeable { * @throws IOException * thrown if an I/O error occurs while uploading the data to the BLOB server */ - private BlobKey putBuffer(final JobID jobId, final String key, final byte[] value, final int offset, final int len) - throws IOException { + private BlobKey putBuffer(JobID jobId, String key, byte[] value, int offset, int len) throws IOException { + if (this.socket.isClosed()) { + throw new IllegalStateException("BLOB Client is not connected. " + + "Client has been shut down or encountered an error before."); + } - final OutputStream os = this.socket.getOutputStream(); - final MessageDigest md = (jobId == null || key == null) ? BlobUtils.createMessageDigest() : - null; - final byte[] buf = new byte[AbstractID.SIZE]; + if (LOG.isDebugEnabled()) { + if (jobId == null) { + LOG.debug(String.format("PUT content addressable BLOB buffer (%d bytes) to %s", + len, socket.getLocalSocketAddress())); + } else { + LOG.debug(String.format("PUT BLOB buffer (%d bytes) under %s / \"%s\" to %s", + len, jobId, key, socket.getLocalSocketAddress())); + } + } - // Send the PUT header - sendPutHeader(os, jobId, key, buf); + try { + final OutputStream os = this.socket.getOutputStream(); + final MessageDigest md = jobId == null ? BlobUtils.createMessageDigest() : null; - // Send the value in iterations of BUFFER_SIZE - int remainingBytes = value.length; - int bytesSent = 0; + // Send the PUT header + sendPutHeader(os, jobId, key); - while (remainingBytes > 0) { + // Send the value in iterations of BUFFER_SIZE + int remainingBytes = len; - final int bytesToSend = Math.min(BlobServer.BUFFER_SIZE, remainingBytes); - BlobServer.writeLength(bytesToSend, buf, os); + while (remainingBytes > 0) { + final int bytesToSend = Math.min(BUFFER_SIZE, remainingBytes); + writeLength(bytesToSend, os); - os.write(value, offset + bytesSent, bytesToSend); + os.write(value, offset, bytesToSend); - // Update the message digest if necessary - if (md != null) { - md.update(value, offset + bytesSent, bytesToSend); - } + // Update the message digest if necessary + if (md != null) { + md.update(value, offset, bytesToSend); + } - remainingBytes -= bytesToSend; - bytesSent += bytesToSend; - } + remainingBytes -= bytesToSend; + offset += bytesToSend; + } + // send -1 as the stream end + writeLength(-1, os); - if (md == null) { - return null; + // Receive blob key and compare + final InputStream is = this.socket.getInputStream(); + return receivePutResponseAndCompare(is, md); } - - // Receive blob key and compare - final InputStream is = this.socket.getInputStream(); - final BlobKey localKey = new BlobKey(md.digest()); - final BlobKey remoteKey = BlobKey.readFromInputStream(is); - - if (!localKey.equals(remoteKey)) { - throw new IOException("Detected data corruption during transfer"); + catch (Throwable t) { + BlobUtils.closeSilently(socket, LOG); + throw new IOException("PUT operation failed: " + t.getMessage(), t); } - - return localKey; } /** * Uploads data from the given input stream to the BLOB server. - * + * * @param jobId * the ID of the job the BLOB belongs to or <code>null</code> to store the BLOB in a content-addressable * manner @@ -374,143 +416,261 @@ public final class BlobClient implements Closeable { * @throws IOException * thrown if an I/O error occurs while uploading the data to the BLOB server */ - private BlobKey putInputStream(final JobID jobId, final String key, final InputStream inputStream) - throws IOException { - - final OutputStream os = this.socket.getOutputStream(); - final MessageDigest md = (jobId == null || key == null) ? BlobUtils.createMessageDigest - () : null; - final byte[] buf = new byte[AbstractID.SIZE]; - final byte[] xferBuf = new byte[BlobServer.BUFFER_SIZE]; - - // Send the PUT header - sendPutHeader(os, jobId, key, buf); - - while (true) { + private BlobKey putInputStream(JobID jobId, String key, InputStream inputStream) throws IOException { + if (this.socket.isClosed()) { + throw new IllegalStateException("BLOB Client is not connected. " + + "Client has been shut down or encountered an error before."); + } - final int read = inputStream.read(xferBuf); - if (read < 0) { - break; + if (LOG.isDebugEnabled()) { + if (jobId == null) { + LOG.debug(String.format("PUT content addressable BLOB stream to %s", + socket.getLocalSocketAddress())); + } else { + LOG.debug(String.format("PUT BLOB stream under %s / \"%s\" to %s", + jobId, key, socket.getLocalSocketAddress())); } - if (read > 0) { - BlobServer.writeLength(read, buf, os); - os.write(xferBuf, 0, read); - if (md != null) { - md.update(xferBuf, 0, read); + } + + try { + final OutputStream os = this.socket.getOutputStream(); + final MessageDigest md = jobId == null ? BlobUtils.createMessageDigest() : null; + final byte[] xferBuf = new byte[BUFFER_SIZE]; + + // Send the PUT header + sendPutHeader(os, jobId, key); + + while (true) { + final int read = inputStream.read(xferBuf); + if (read < 0) { + // we are done. send a -1 and be done + writeLength(-1, os); + break; + } + if (read > 0) { + writeLength(read, os); + os.write(xferBuf, 0, read); + if (md != null) { + md.update(xferBuf, 0, read); + } } } + + // Receive blob key and compare + final InputStream is = this.socket.getInputStream(); + return receivePutResponseAndCompare(is, md); + } + catch (Throwable t) { + BlobUtils.closeSilently(socket, LOG); + throw new IOException("PUT operation failed: " + t.getMessage(), t); } + } - if (md == null) { - return null; + private BlobKey receivePutResponseAndCompare(InputStream is, MessageDigest md) throws IOException { + int response = is.read(); + if (response < 0) { + throw new EOFException("Premature end of response"); } + else if (response == RETURN_OKAY) { + if (md == null) { + // not content addressable + return null; + } - // Receive blob key and compare - final InputStream is = this.socket.getInputStream(); - final BlobKey localKey = new BlobKey(md.digest()); - final BlobKey remoteKey = BlobKey.readFromInputStream(is); + BlobKey remoteKey = BlobKey.readFromInputStream(is); + BlobKey localKey = new BlobKey(md.digest()); - if (!localKey.equals(remoteKey)) { - throw new IOException("Detected data corruption during transfer"); - } + if (!localKey.equals(remoteKey)) { + throw new IOException("Detected data corruption during transfer"); + } - return localKey; + return localKey; + } + else if (response == RETURN_ERROR) { + Throwable cause = readExceptionFromStream(is); + throw new IOException("Server side error: " + cause.getMessage(), cause); + } + else { + throw new IOException("Unrecognized response"); + } } /** - * Downloads the BLOB identified by the given job ID and key from the BLOB server. If no such BLOB exists on the - * server, a {@link FileNotFoundException} is thrown. - * + * Constructs and writes the header data for a PUT request to the given output stream. + * NOTE: If the jobId and key are null, we send the data to the content addressable section. + * + * @param outputStream + * the output stream to write the PUT header data to * @param jobID - * the job ID identifying the BLOB to download + * the ID of job the BLOB belongs to or <code>null</code> to indicate the upload of a + * content-addressable BLOB * @param key - * the key identifying the BLOB to download - * @return an input stream to read the retrieved data from + * the key of the BLOB to upload or <code>null</code> to indicate the upload of a content-addressable BLOB * @throws IOException - * thrown if an I/O error occurs during the download + * thrown if an I/O error occurs while writing the header data to the output stream */ - public InputStream get(final JobID jobID, final String key) throws IOException { + private void sendPutHeader(OutputStream outputStream, JobID jobID, String key) throws IOException { + // sanity check that either both are null or both are not null + if ((jobID != null || key != null) && !(jobID != null && key != null)) { + throw new IllegalArgumentException(); + } - if (key.length() > BlobServer.MAX_KEY_LENGTH) { - throw new IllegalArgumentException("Keys must not be longer than " + BlobServer.MAX_KEY_LENGTH); + // Signal type of operation + outputStream.write(PUT_OPERATION); + + // Check if PUT should be done in content-addressable manner + if (jobID == null) { + outputStream.write(CONTENT_ADDRESSABLE); } + else { + outputStream.write(NAME_ADDRESSABLE); + // Send job ID and the key + byte[] idBytes = jobID.getBytes(); + byte[] keyBytes = key.getBytes(BlobUtils.DEFAULT_CHARSET); + outputStream.write(idBytes); + writeLength(keyBytes.length, outputStream); + outputStream.write(keyBytes); + } + } - final OutputStream os = this.socket.getOutputStream(); - final byte[] buf = new byte[AbstractID.SIZE]; + // -------------------------------------------------------------------------------------------- + // DELETE + // -------------------------------------------------------------------------------------------- - // Send GET header - sendGetHeader(os, jobID, key, null, buf); + /** + * Deletes the BLOB identified by the given BLOB key from the BLOB server. + * + * @param key + * the key to identify the BLOB + * @throws IOException + * thrown if an I/O error occurs while transferring the request to the BLOB server + */ + public void delete(BlobKey key) throws IOException { + if (key == null) { + throw new IllegalArgumentException("BLOB key must not be null"); + } - return new BlobInputStream(this.socket.getInputStream(), null, buf); + deleteInternal(null, null, key); } /** - * Downloads the BLOB identified by the given BLOB key from the BLOB server. If no such BLOB exists on the server, a - * {@link FileNotFoundException} is thrown. - * - * @param blobKey - * the BLOB key identifying the BLOB to download - * @return an input stream to read the retrieved data from + * Deletes the BLOB identified by the given job ID and key from the BLOB server. + * + * @param jobId + * the job ID to identify the BLOB + * @param key + * the key to identify the BLOB * @throws IOException - * thrown if an I/O error occurs during the download + * thrown if an I/O error occurs while transferring the request to the BLOB server */ - public InputStream get(final BlobKey blobKey) throws IOException { + public void delete(JobID jobId, String key) throws IOException { + if (jobId == null) { + throw new IllegalArgumentException("JobID must not be null"); + } + if (key == null) { + throw new IllegalArgumentException("Key must not be null"); + } + if (key.length() > MAX_KEY_LENGTH) { + throw new IllegalArgumentException("Keys must not be longer than " + MAX_KEY_LENGTH); + } - final OutputStream os = this.socket.getOutputStream(); - final byte[] buf = new byte[AbstractID.SIZE]; + deleteInternal(jobId, key, null); + } - // Send GET header - sendGetHeader(os, null, null, blobKey, buf); + /** + * Deletes all BLOBs belonging to the job with the given ID from the BLOB server + * + * @param jobId + * the job ID to identify the BLOBs to be deleted + * @throws IOException + * thrown if an I/O error occurs while transferring the request to the BLOB server + */ + public void deleteAll(JobID jobId) throws IOException { + if (jobId == null) { + throw new IllegalArgumentException("Argument jobID must not be null"); + } - return new BlobInputStream(this.socket.getInputStream(), blobKey, buf); + deleteInternal(jobId, null, null); } /** - * Constructs and writes the header data for a GET operation to the given output stream. - * - * @param outputStream - * the output stream to write the header data to - * @param jobID - * the job ID identifying the BLOB to download or <code>null</code> to indicate the BLOB key should be used - * to identify the BLOB on the server instead - * @param key - * the key identifying the BLOB to download or <code>null</code> to indicate the BLOB key should be used to - * identify the BLOB on the server instead - * @param key2 - * the BLOB key to identify the BLOB to download if either the job ID or the regular key are - * <code>null</code> - * @param buf - * auxiliary buffer used for data serialization - * @throws IOException - * thrown if an I/O error occurs while writing the header data to the output stream + * Delete one or multiple BLOBs from the BLOB server. + * + * @param jobId The job ID to identify the BLOB(s) to be deleted. + * @param key The key to identify the specific BLOB to delete or <code>null</code> to delete + * all BLOBs associated with the job id. + * @param bKey The blob key to identify a specific content addressable BLOB. This parameter + * is exclusive with jobId and key. + * @throws IOException Thrown if an I/O error occurs while transferring the request to the BLOB server. */ - private void sendGetHeader(final OutputStream outputStream, final JobID jobID, final String key, - final BlobKey key2, final byte[] buf) throws IOException { + private void deleteInternal(JobID jobId, String key, BlobKey bKey) throws IOException { + if ((jobId != null && bKey != null) || (jobId == null && bKey == null)) { + throw new IllegalArgumentException(); + } - // Signal type of operation - outputStream.write(BlobServer.GET_OPERATION); + try { + final OutputStream outputStream = this.socket.getOutputStream(); + final InputStream inputStream = this.socket.getInputStream(); - // Check if GET should be done in content-addressable manner - if (jobID == null || key == null) { - outputStream.write(1); - key2.writeToOutputStream(outputStream); - } else { - outputStream.write(0); - // Send job ID - final ByteBuffer bb = ByteBuffer.wrap(buf); - jobID.write(bb); - outputStream.write(buf); - - // Send the key - byte[] keyBytes = key.getBytes(BlobUtils.DEFAULT_CHARSET); - BlobServer.writeLength(keyBytes.length, buf, outputStream); - outputStream.write(keyBytes); + // Signal type of operation + outputStream.write(DELETE_OPERATION); + + // Check if DELETE should be done in content-addressable manner + if (jobId == null) { + // delete blob key + outputStream.write(CONTENT_ADDRESSABLE); + bKey.writeToOutputStream(outputStream); + } + else if (key != null) { + // delete BLOB for jobID and name key + outputStream.write(NAME_ADDRESSABLE); + // Send job ID and the key + byte[] idBytes = jobId.getBytes(); + byte[] keyBytes = key.getBytes(BlobUtils.DEFAULT_CHARSET); + outputStream.write(idBytes); + writeLength(keyBytes.length, outputStream); + outputStream.write(keyBytes); + } + else { + // delete all blobs for JobID + outputStream.write(JOB_ID_SCOPE); + byte[] idBytes = jobId.getBytes(); + outputStream.write(idBytes); + } + + int response = inputStream.read(); + if (response < 0) { + throw new EOFException("Premature end of response"); + } + if (response == RETURN_ERROR) { + Throwable cause = readExceptionFromStream(inputStream); + throw new IOException("Server side error: " + cause.getMessage(), cause); + } + else if (response != RETURN_OKAY) { + throw new IOException("Unrecognized response"); + } + } + catch (Throwable t) { + BlobUtils.closeSilently(socket, LOG); + throw new IOException("DELETE operation failed: " + t.getMessage(), t); } } - @Override - public void close() throws IOException { + // -------------------------------------------------------------------------------------------- + // Miscellaneous + // -------------------------------------------------------------------------------------------- - this.socket.close(); + private static Throwable readExceptionFromStream(InputStream in) throws IOException { + int len = readLength(in); + byte[] bytes = new byte[len]; + readFully(in, bytes, 0, len, "Error message"); + + try { + return (Throwable) InstantiationUtil.deserializeObject(bytes, ClassLoader.getSystemClassLoader()); + } + catch (ClassNotFoundException e) { + // should never occur + throw new IOException("Could not transfer error message", e); + } } } http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobConnection.java deleted file mode 100644 index 3b7a31b..0000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobConnection.java +++ /dev/null @@ -1,337 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.blob; - -import java.io.EOFException; -import java.io.File; -import java.io.FileInputStream; -import java.io.FileOutputStream; -import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; -import java.net.Socket; -import java.nio.ByteBuffer; -import java.security.MessageDigest; - -import org.apache.flink.runtime.jobgraph.JobID; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A BLOB connection handles a series of requests from a particular BLOB client. - * <p> - * This class it thread-safe. - */ -class BlobConnection extends Thread { - - /** - * The log object used for debugging. - */ - private static final Logger LOG = LoggerFactory.getLogger(BlobConnection.class); - - /** - * The socket to communicate with the client. - */ - private final Socket clientSocket; - - /** - * The BLOB server. - */ - private final BlobServer blobServer; - - /** - * Creates a new BLOB connection for a client request - * - * @param clientSocket - * the socket to read/write data - * @param blobServer - * the BLOB server - */ - BlobConnection(final Socket clientSocket, final BlobServer blobServer) { - super("BLOB connection for " + clientSocket.getRemoteSocketAddress().toString()); - - this.clientSocket = clientSocket; - this.blobServer = blobServer; - } - - @Override - public void run() { - - try { - - final InputStream inputStream = this.clientSocket.getInputStream(); - final OutputStream outputStream = this.clientSocket.getOutputStream(); - final byte[] buffer = new byte[BlobServer.BUFFER_SIZE]; - - while (true) { - - // Read the requested operation - final int operation = inputStream.read(); - if (operation < 0) { - return; - } - - switch (operation) { - case BlobServer.PUT_OPERATION: - put(inputStream, outputStream, buffer); - break; - case BlobServer.GET_OPERATION: - get(inputStream, outputStream, buffer); - break; - case BlobServer.DELETE_OPERATION: - delete(inputStream, buffer); - break; - default: - throw new IOException("Unknown operation " + operation); - } - } - - } catch (IOException ioe) { - if (LOG.isErrorEnabled()) { - LOG.error("Error while executing BLOB connection.", ioe); - } - } finally { - closeSilently(this.clientSocket); - } - } - - /** - * Handles an incoming GET request from a BLOB client. - * - * @param inputStream - * the input stream to read incoming data from - * @param outputStream - * the output stream to send data back to the client - * @param buf - * an auxiliary buffer for data serialization/deserialization - * @throws IOException - * thrown if an I/O error occurs while reading/writing data from/to the respective streams - */ - private void get(final InputStream inputStream, final OutputStream outputStream, final byte[] buf) - throws IOException { - - File blob = null; - - final int contentAdressable = inputStream.read(); - if (contentAdressable < 0) { - throw new EOFException("Expected GET header"); - } - - if (contentAdressable == 0) { - // Receive the job ID - BlobServer.readFully(inputStream, buf, 0, JobID.SIZE); - final ByteBuffer bb = ByteBuffer.wrap(buf); - final JobID jobID = JobID.fromByteBuffer(bb); - // Receive the key - final String key = readKey(buf, inputStream); - blob = this.blobServer.getStorageLocation(jobID, key); - } else { - final BlobKey key = BlobKey.readFromInputStream(inputStream); - blob = blobServer.getStorageLocation(key); - } - - // Check if BLOB exists - if (!blob.exists()) { - BlobServer.writeLength(-1, buf, outputStream); - return; - } - - BlobServer.writeLength((int) blob.length(), buf, outputStream); - FileInputStream fis = null; - try { - fis = new FileInputStream(blob); - - while (true) { - - final int read = fis.read(buf); - if (read < 0) { - break; - } - outputStream.write(buf, 0, read); - } - - } finally { - if (fis != null) { - fis.close(); - } - } - } - - /** - * Handles an incoming PUT request from a BLOB client. - * - * @param inputStream - * the input stream to read incoming data from - * @param outputStream - * the output stream to send data back to the client - * @param buf - * an auxiliary buffer for data serialization/deserialization - * @throws IOException - * thrown if an I/O error occurs while reading/writing data from/to the respective streams - */ - private void put(final InputStream inputStream, final OutputStream outputStream, final byte[] buf) - throws IOException { - - JobID jobID = null; - String key = null; - MessageDigest md = null; - final int contentAdressable = inputStream.read(); - if (contentAdressable < 0) { - throw new EOFException("Expected PUT header"); - } - - if (contentAdressable == 0) { - // Receive the job ID - BlobServer.readFully(inputStream, buf, 0, JobID.SIZE); - final ByteBuffer bb = ByteBuffer.wrap(buf); - jobID = JobID.fromByteBuffer(bb); - // Receive the key - key = readKey(buf, inputStream); - } else { - md = BlobUtils.createMessageDigest(); - } - - File incomingFile = null; - FileOutputStream fos = null; - - try { - incomingFile = blobServer.getTemporaryFilename(); - fos = new FileOutputStream(incomingFile); - - while (true) { - - final int bytesExpected = BlobServer.readLength(buf, inputStream); - if (bytesExpected > BlobServer.BUFFER_SIZE) { - throw new IOException("Unexpected number of incoming bytes: " + bytesExpected); - } - - BlobServer.readFully(inputStream, buf, 0, bytesExpected); - fos.write(buf, 0, bytesExpected); - - if (md != null) { - md.update(buf, 0, bytesExpected); - } - - if (bytesExpected < BlobServer.BUFFER_SIZE) { - break; - } - } - - fos.close(); - fos = null; - - if (contentAdressable == 0) { - final File storageFile = this.blobServer.getStorageLocation(jobID, key); - incomingFile.renameTo(storageFile); - incomingFile = null; - } else { - final BlobKey blobKey = new BlobKey(md.digest()); - final File storageFile = blobServer.getStorageLocation(blobKey); - incomingFile.renameTo(storageFile); - incomingFile = null; - - // Return computed key to client for validation - blobKey.writeToOutputStream(outputStream); - } - } finally { - if (fos != null) { - fos.close(); - } - if (incomingFile != null) { - incomingFile.delete(); - } - } - } - - /** - * Handles an incoming DELETE request from a BLOB client. - * - * @param inputStream - * the input stream to read the request from. - * @param buf - * an auxiliary buffer for data deserialization - * @throws IOException - * thrown if an I/O error occurs while reading the request data from the input stream - */ - private void delete(final InputStream inputStream, final byte[] buf) throws IOException { - - // Receive the job ID - BlobServer.readFully(inputStream, buf, 0, JobID.SIZE); - final ByteBuffer bb = ByteBuffer.wrap(buf); - final JobID jobID = JobID.fromByteBuffer(bb); - String key = null; - - final int r = inputStream.read(); - if (r < 0) { - throw new EOFException(); - } - if (r > 0) { - // Delete individual BLOB - // Receive the key - key = readKey(buf, inputStream); - - final File blob = this.blobServer.getStorageLocation(jobID, key); - blob.delete(); - - } else { - // Delete all BLOBs for this job - blobServer.deleteJobDirectory(jobID); - } - } - - /** - * Auxiliary method to silently close a {@link Socket}. - * - * @param socket - * the socket to close - */ - static void closeSilently(final Socket socket) { - - try { - if (socket != null) { - socket.close(); - } - } catch (IOException ioe) { - } - } - - /** - * Reads the key of a BLOB from the given input stream. - * - * @param buf - * auxiliary buffer to data deserialization - * @param inputStream - * the input stream to read the key from - * @return the key of a BLOB - * @throws IOException - * thrown if an I/O error occurs while reading the key data from the input stream - */ - private static String readKey(final byte[] buf, - final InputStream inputStream) throws IOException { - - final int keyLength = BlobServer.readLength(buf, inputStream); - if (keyLength > BlobServer.MAX_KEY_LENGTH) { - throw new IOException("Unexpected key length " + keyLength); - } - - BlobServer.readFully(inputStream, buf, 0, keyLength); - - return new String(buf, 0, keyLength, BlobUtils.DEFAULT_CHARSET); - } -} http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java index 3654f8f..a89a461 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobInputStream.java @@ -24,6 +24,8 @@ import java.io.IOException; import java.io.InputStream; import java.security.MessageDigest; +import static org.apache.flink.runtime.blob.BlobUtils.readLength; + /** * The BLOB input stream is a special implementation of an {@link InputStream} to read the results of a GET operation * from the BLOB server. @@ -63,15 +65,13 @@ final class BlobInputStream extends InputStream { * the underlying input stream to read from * @param blobKey * the expected BLOB key for content-addressable BLOBs, <code>null</code> for non-content-addressable BLOBs. - * @param buf - * auxiliary buffer to read the meta data from the BLOB server * @throws IOException * throws if an I/O error occurs while reading the BLOB data from the BLOB server */ - BlobInputStream(final InputStream wrappedInputStream, final BlobKey blobKey, final byte[] buf) throws IOException { + BlobInputStream(final InputStream wrappedInputStream, final BlobKey blobKey) throws IOException { this.wrappedInputStream = wrappedInputStream; this.blobKey = blobKey; - this.bytesToReceive = BlobServer.readLength(buf, wrappedInputStream); + this.bytesToReceive = readLength(wrappedInputStream); if (this.bytesToReceive < 0) { throw new FileNotFoundException(); } @@ -157,7 +157,7 @@ final class BlobInputStream extends InputStream { @Override public int available() throws IOException { - return 0; + return this.bytesToReceive - this.bytesReceived; } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java index e3d237d..bd254dd 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobKey.java @@ -16,7 +16,6 @@ * limitations under the License. */ - package org.apache.flink.runtime.blob; import java.io.EOFException; @@ -142,7 +141,7 @@ public final class BlobKey implements Serializable, Comparable<BlobKey> { while (bytesRead < BlobKey.SIZE) { final int read = inputStream.read(key, bytesRead, BlobKey.SIZE - bytesRead); if (read < 0) { - throw new EOFException(); + throw new EOFException("Read an incomplete BLOB key"); } bytesRead += read; } http://git-wip-us.apache.org/repos/asf/flink/blob/cfce493f/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java index b27af03..c0e81f1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/blob/BlobServer.java @@ -18,14 +18,15 @@ package org.apache.flink.runtime.blob; -import java.io.EOFException; import java.io.File; import java.io.FileNotFoundException; import java.io.IOException; -import java.io.InputStream; -import java.io.OutputStream; import java.net.ServerSocket; import java.net.URL; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -42,61 +43,32 @@ import org.slf4j.LoggerFactory; * spawning threads to handle these requests. Furthermore, it takes care of creating the directory structure to store * the BLOBs or temporarily cache them. */ -public final class BlobServer extends Thread implements BlobService { +public class BlobServer extends Thread implements BlobService { - /** - * The log object used for debugging. - */ + /** The log object used for debugging. */ private static final Logger LOG = LoggerFactory.getLogger(BlobServer.class); - /** - * The buffer size in bytes for network transfers. - */ - static final int BUFFER_SIZE = 4096; - - /** - * The maximum key length allowed for storing BLOBs. - */ - static final int MAX_KEY_LENGTH = 64; - - /** - * Internal code to identify a PUT operation. - */ - static final byte PUT_OPERATION = 0; - - /** - * Internal code to identify a GET operation. - */ - static final byte GET_OPERATION = 1; - - /** - * Internal code to identify a DELETE operation. - */ - static final byte DELETE_OPERATION = 2; - - /** - * Counter to generate unique names for temporary files. - */ + /** Counter to generate unique names for temporary files. */ private final AtomicInteger tempFileCounter = new AtomicInteger(0); - /** - * The server socket listening for incoming connections. - */ + /** The server socket listening for incoming connections. */ private final ServerSocket serverSocket; - /** - * Indicates whether a shutdown of server component has been requested. - */ - private AtomicBoolean shutdownRequested = new AtomicBoolean(); + /** Indicates whether a shutdown of server component has been requested. */ + private final AtomicBoolean shutdownRequested = new AtomicBoolean(); /** Shutdown hook thread to ensure deletion of the storage directory. */ private final Thread shutdownHook; - /** - * Is the root directory for file storage - */ + /** Is the root directory for file storage */ private final File storageDir; + /** Set of currently running threads */ + private final Set<BlobServerConnection> activeConnections = new HashSet<BlobServerConnection>(); + + /** The maximum number of concurrent connections */ + private final int maxConnections; + /** * Instantiates a new BLOB server and binds it to a free network port. * @@ -105,38 +77,57 @@ public final class BlobServer extends Thread implements BlobService { */ public BlobServer(Configuration config) throws IOException { + // configure and create the storage directory String storageDirectory = config.getString(ConfigConstants.BLOB_STORAGE_DIRECTORY_KEY, null); this.storageDir = BlobUtils.initStorageDirectory(storageDirectory); LOG.info("Created BLOB server storage directory {}", storageDir); + // configure the maximum number of concurrent connections + final int maxConnections = config.getInteger( + ConfigConstants.BLOB_FETCH_CONCURRENT_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT); + if (maxConnections >= 1) { + this.maxConnections = maxConnections; + } + else { + LOG.warn("Invalid value for maximum connections in BLOB server: {}. Using default value of {}", + maxConnections, ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT); + this.maxConnections = ConfigConstants.DEFAULT_BLOB_FETCH_CONCURRENT; + } + + // configure the backlog of connections + int backlog = config.getInteger(ConfigConstants.BLOB_FETCH_BACKLOG_KEY, ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG); + if (backlog < 1) { + LOG.warn("Invalid value for BLOB connection backlog: {}. Using default value of {}", + backlog, ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG); + backlog = ConfigConstants.DEFAULT_BLOB_FETCH_BACKLOG; + } + // Add shutdown hook to delete storage directory this.shutdownHook = BlobUtils.addShutdownHook(this, LOG); + // start the server try { - this.serverSocket = new ServerSocket(0); - - start(); - - if (LOG.isInfoEnabled()) { - LOG.info(String.format("Started BLOB server on port %d", - this.serverSocket.getLocalPort())); - } + this.serverSocket = new ServerSocket(0, backlog); } catch (IOException e) { - throw new IOException("Could not create BlobServer with random port.", e); + throw new IOException("Could not create BlobServer with automatic port choice.", e); } - } - /** - * Returns the network port the BLOB server is bound to. The return value of this method is undefined after the BLOB - * server has been shut down. - * - * @return the network port the BLOB server is bound to - */ - public int getServerPort() { - return this.serverSocket.getLocalPort(); + // start the server thread + setName("BLOB Server listener at " + getPort()); + setDaemon(true); + start(); + + if (LOG.isInfoEnabled()) { + LOG.info("Started BLOB server at {}:{} - max concurrent requests: {} - max backlog: {}", + serverSocket.getInetAddress().getHostAddress(), getPort(), maxConnections, backlog); + } } + // -------------------------------------------------------------------------------------------- + // Path Accessors + // -------------------------------------------------------------------------------------------- + /** * Returns a file handle to the file associated with the given blob key on the blob * server. @@ -174,7 +165,7 @@ public final class BlobServer extends Thread implements BlobService { * * @return a temporary file inside the BLOB server's incoming directory */ - File getTemporaryFilename() { + File createTemporaryFilename() { return new File(BlobUtils.getIncomingDirectory(storageDir), String.format("temp-%08d", tempFileCounter.getAndIncrement())); } @@ -183,7 +174,26 @@ public final class BlobServer extends Thread implements BlobService { public void run() { try { while (!this.shutdownRequested.get()) { - new BlobConnection(this.serverSocket.accept(), this).start(); + BlobServerConnection conn = new BlobServerConnection(serverSocket.accept(), this); + try { + synchronized (activeConnections) { + while (activeConnections.size() >= maxConnections) { + activeConnections.wait(2000); + } + activeConnections.add(conn); + } + + conn.start(); + conn = null; + } + finally { + if (conn != null) { + conn.close(); + synchronized (activeConnections) { + activeConnections.remove(conn); + } + } + } } } catch (Throwable t) { @@ -206,6 +216,10 @@ public final class BlobServer extends Thread implements BlobService { catch (IOException ioe) { LOG.debug("Error while closing the server socket.", ioe); } + + // wake the thread up, in case it is waiting on some operation + interrupt(); + try { join(); } @@ -213,6 +227,16 @@ public final class BlobServer extends Thread implements BlobService { LOG.debug("Error while waiting for this thread to die.", ie); } + synchronized (activeConnections) { + if (!activeConnections.isEmpty()) { + for (BlobServerConnection conn : activeConnections) { + LOG.debug("Shutting down connection " + conn.getName()); + conn.close(); + } + activeConnections.clear(); + } + } + // Clean up the storage directory try { FileUtils.deleteDirectory(storageDir); @@ -255,8 +279,7 @@ public final class BlobServer extends Thread implements BlobService { final File localFile = BlobUtils.getStorageLocation(storageDir, requiredBlob); if (!localFile.exists()) { - throw new FileNotFoundException("File " + localFile.getCanonicalPath() + " does " + - "not exist."); + throw new FileNotFoundException("File " + localFile.getCanonicalPath() + " does not exist."); } else { return localFile.toURI().toURL(); } @@ -266,15 +289,17 @@ public final class BlobServer extends Thread implements BlobService { * This method deletes the file associated to the blob key if it exists in the local storage * of the blob server. * - * @param blobKey associated with the file to be deleted + * @param key associated with the file to be deleted * @throws IOException */ @Override - public void delete(BlobKey blobKey) throws IOException { - final File localFile = BlobUtils.getStorageLocation(storageDir, blobKey); + public void delete(BlobKey key) throws IOException { + final File localFile = BlobUtils.getStorageLocation(storageDir, key); if (localFile.exists()) { - localFile.delete(); + if (!localFile.delete()) { + LOG.warn("Failed to delete locally BLOB " + key + " at " + localFile.getAbsolutePath()); + } } } @@ -285,95 +310,35 @@ public final class BlobServer extends Thread implements BlobService { */ @Override public int getPort() { - return getServerPort(); + return this.serverSocket.getLocalPort(); } /** - * Auxiliary method to write the length of an upcoming data chunk to an - * output stream. + * Tests whether the BLOB server has been requested to shut down. * - * @param length - * the length of the upcoming data chunk in bytes - * @param buf - * the byte buffer to use for the integer serialization - * @param outputStream - * the output stream to write the length to - * @throws IOException - * thrown if an I/O error occurs while writing to the output - * stream + * @return True, if the server has been requested to shut down, false otherwise. */ - static void writeLength(final int length, final byte[] buf, - final OutputStream outputStream) throws IOException { - - buf[0] = (byte) (length & 0xff); - buf[1] = (byte) ((length >> 8) & 0xff); - buf[2] = (byte) ((length >> 16) & 0xff); - buf[3] = (byte) ((length >> 24) & 0xff); - - outputStream.write(buf, 0, 4); + public boolean isShutdown() { + return this.shutdownRequested.get(); } /** - * Auxiliary method to read the length of an upcoming data chunk from an - * input stream. - * - * @param buf - * the byte buffer to use for the integer deserialization - * @param inputStream - * the input stream to read the length from - * @return the length of the upcoming data chunk in bytes - * @throws IOException - * thrown if an I/O error occurs while reading from the input - * stream + * Access to the server socket, for testing */ - static int readLength(final byte[] buf, final InputStream inputStream) - throws IOException { - - int bytesRead = 0; - while (bytesRead < 4) { - final int read = inputStream.read(buf, bytesRead, 4 - bytesRead); - if (read < 0) { - throw new EOFException(); - } - bytesRead += read; - } - - bytesRead = buf[0] & 0xff; - bytesRead |= (buf[1] & 0xff) << 8; - bytesRead |= (buf[2] & 0xff) << 16; - bytesRead |= (buf[3] & 0xff) << 24; - - return bytesRead; + ServerSocket getServerSocket() { + return this.serverSocket; } - /** - * Auxiliary method to read a particular number of bytes from an input stream. This method blocks until the - * requested number of bytes have been read from the stream. If the stream cannot offer enough data, an - * {@link EOFException} is thrown. - * - * @param inputStream - * the input stream to read the data from - * @param buf - * the buffer to store the read data - * @param off - * the offset inside the buffer - * @param len - * the number of bytes to read from the stream - * @throws IOException - * thrown if I/O error occurs while reading from the stream or the stream cannot offer enough data - */ - static void readFully(final InputStream inputStream, - final byte[] buf, final int off, final int len) throws IOException { - - int bytesRead = 0; - while (bytesRead < len) { + void unregisterConnection(BlobServerConnection conn) { + synchronized (activeConnections) { + activeConnections.remove(conn); + activeConnections.notifyAll(); + } + } - final int read = inputStream.read(buf, off + bytesRead, len - - bytesRead); - if (read < 0) { - throw new EOFException(); - } - bytesRead += read; + List<BlobServerConnection> getCurrentyActiveConnections() { + synchronized (activeConnections) { + return new ArrayList<BlobServerConnection>(activeConnections); } } }