Updated Branches: refs/heads/flume-1.4 bedfc4e9b -> 67d44cd84
FLUME-1925. HDFS timeouts should not starve other threads. (Hari Shreedharan via Mike Percy) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/67d44cd8 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/67d44cd8 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/67d44cd8 Branch: refs/heads/flume-1.4 Commit: 67d44cd84074ccfbc0ab295053852ea3acdc52dc Parents: bedfc4e Author: Mike Percy <[email protected]> Authored: Thu Feb 28 15:06:13 2013 -0800 Committer: Mike Percy <[email protected]> Committed: Thu Feb 28 15:06:49 2013 -0800 ---------------------------------------------------------------------- .../org/apache/flume/sink/hdfs/BucketWriter.java | 145 +++++++++++---- .../org/apache/flume/sink/hdfs/HDFSEventSink.java | 99 +---------- .../apache/flume/sink/hdfs/TestBucketWriter.java | 16 +- 3 files changed, 126 insertions(+), 134 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/67d44cd8/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java index 0786857..cdc37f6 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java @@ -21,9 +21,14 @@ package org.apache.flume.sink.hdfs; import java.io.IOException; import java.security.PrivilegedExceptionAction; import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicLong; import org.apache.flume.Clock; @@ -34,7 +39,6 @@ import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.hdfs.HDFSEventSink.WriterCallback; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile.CompressionType; import org.apache.hadoop.io.compress.CompressionCodec; @@ -93,6 +97,8 @@ class BucketWriter { private volatile ScheduledFuture<Void> idleFuture; private final WriterCallback onIdleCallback; private final String onIdleCallbackPath; + private final long callTimeout; + private final ExecutorService callTimeoutPool; private Clock clock = new SystemClock(); @@ -101,12 +107,13 @@ class BucketWriter { protected boolean idleClosed = false; BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize, - Context context, String filePath, String fileName, String inUsePrefix, - String inUseSuffix, String fileSuffix, CompressionCodec codeC, - CompressionType compType, HDFSWriter writer, - ScheduledExecutorService timedRollerPool, UserGroupInformation user, - SinkCounter sinkCounter, int idleTimeout, WriterCallback onIdleCallback, - String onIdleCallbackPath) { + Context context, String filePath, String fileName, String inUsePrefix, + String inUseSuffix, String fileSuffix, CompressionCodec codeC, + CompressionType compType, HDFSWriter writer, + ScheduledExecutorService timedRollerPool, UserGroupInformation user, + SinkCounter sinkCounter, int idleTimeout, WriterCallback onIdleCallback, + String onIdleCallbackPath, long callTimeout, + ExecutorService callTimeoutPool) { this.rollInterval = rollInterval; this.rollSize = rollSize; this.rollCount = rollCount; @@ -125,7 +132,8 @@ class BucketWriter { this.idleTimeout = idleTimeout; this.onIdleCallback = onIdleCallback; this.onIdleCallbackPath = onIdleCallbackPath; - + this.callTimeout = callTimeout; + this.callTimeoutPool = callTimeoutPool; fileExtensionCounter = new AtomicLong(clock.currentTimeMillis()); isOpen = false; @@ -193,7 +201,7 @@ class BucketWriter { throw new IOException("Invalid file settings"); } - Configuration config = new Configuration(); + final Configuration config = new Configuration(); // disable FileSystem JVM shutdown hook config.setBoolean("fs.automatic.close", false); @@ -223,16 +231,22 @@ class BucketWriter { targetPath = filePath + DIRECTORY_DELIMITER + fullFileName; LOG.info("Creating " + bucketPath); - if (codeC == null) { - // Need to get reference to FS using above config before underlying - // writer does in order to avoid shutdown hook & IllegalStateExceptions - fileSystem = new Path(bucketPath).getFileSystem(config); - writer.open(bucketPath); - } else { - // need to get reference to FS before writer does to avoid shutdown hook - fileSystem = new Path(bucketPath).getFileSystem(config); - writer.open(bucketPath, codeC, compType); - } + callWithTimeout(new Callable<Void>() { + @Override + public Void call() throws Exception { + if (codeC == null) { + // Need to get reference to FS using above config before underlying + // writer does in order to avoid shutdown hook & IllegalStateExceptions + fileSystem = new Path(bucketPath).getFileSystem(config); + writer.open(bucketPath); + } else { + // need to get reference to FS before writer does to avoid shutdown hook + fileSystem = new Path(bucketPath).getFileSystem(config); + writer.open(bucketPath, codeC, compType); + } + return null; + } + }); } catch (Exception ex) { sinkCounter.incrementConnectionFailedCount(); if (ex instanceof IOException) { @@ -287,11 +301,17 @@ class BucketWriter { * doClose() must only be called by close() * @throws IOException */ - private void doClose() throws IOException { + private void doClose() throws IOException, InterruptedException { LOG.debug("Closing {}", bucketPath); if (isOpen) { try { - writer.close(); // could block + callWithTimeout(new Callable<Void>() { + @Override + public Void call() throws Exception { + writer.close(); // could block + return null; + } + }); sinkCounter.incrementConnectionClosedCount(); } catch (IOException e) { LOG.warn("failed to close() HDFSWriter for file (" + bucketPath + @@ -361,8 +381,14 @@ class BucketWriter { * doFlush() must only be called by flush() * @throws IOException */ - private void doFlush() throws IOException { - writer.sync(); // could block + private void doFlush() throws IOException, InterruptedException { + callWithTimeout(new Callable<Void>() { + @Override + public Void call() throws Exception { + writer.sync(); // could block + return null; + } + }); batchCounter = 0; } @@ -378,7 +404,7 @@ class BucketWriter { * @throws IOException * @throws InterruptedException */ - public synchronized void append(Event event) + public synchronized void append(final Event event) throws IOException, InterruptedException { checkAndThrowInterruptedException(); if (!isOpen) { @@ -398,7 +424,13 @@ class BucketWriter { // write the event try { sinkCounter.incrementEventDrainAttemptCount(); - writer.append(event); // could block + callWithTimeout(new Callable<Void>() { + @Override + public Void call() throws Exception { + writer.append(event); // could block + return null; + } + }); } catch (IOException e) { LOG.warn("Caught IOException writing to HDFSWriter ({}). Closing file (" + bucketPath + ") and rethrowing exception.", @@ -444,18 +476,24 @@ class BucketWriter { /** * Rename bucketPath file from .tmp to permanent location. */ - private void renameBucket() throws IOException { + private void renameBucket() throws IOException, InterruptedException { if(bucketPath.equals(targetPath)) { return; } - Path srcPath = new Path(bucketPath); - Path dstPath = new Path(targetPath); + final Path srcPath = new Path(bucketPath); + final Path dstPath = new Path(targetPath); - if(fileSystem.exists(srcPath)) { // could block - LOG.info("Renaming " + srcPath + " to " + dstPath); - fileSystem.rename(srcPath, dstPath); // could block - } + callWithTimeout(new Callable<Object>() { + @Override + public Object call() throws Exception { + if(fileSystem.exists(srcPath)) { // could block + LOG.info("Renaming " + srcPath + " to " + dstPath); + fileSystem.rename(srcPath, dstPath); // could block + } + return null; + } + }); } @Override @@ -485,4 +523,47 @@ class BucketWriter { + "taking too long."); } } + + /** + * Execute the callable on a separate thread and wait for the completion + * for the specified amount of time in milliseconds. In case of timeout + * cancel the callable and throw an IOException + */ + private <T> T callWithTimeout(Callable<T> callable) + throws IOException, InterruptedException { + Future<T> future = callTimeoutPool.submit(callable); + try { + if (callTimeout > 0) { + return future.get(callTimeout, TimeUnit.MILLISECONDS); + } else { + return future.get(); + } + } catch (TimeoutException eT) { + future.cancel(true); + sinkCounter.incrementConnectionFailedCount(); + throw new IOException("Callable timed out after " + callTimeout + " ms", + eT); + } catch (ExecutionException e1) { + sinkCounter.incrementConnectionFailedCount(); + Throwable cause = e1.getCause(); + if (cause instanceof IOException) { + throw (IOException) cause; + } else if (cause instanceof InterruptedException) { + throw (InterruptedException) cause; + } else if (cause instanceof RuntimeException) { + throw (RuntimeException) cause; + } else if (cause instanceof Error) { + throw (Error)cause; + } else { + throw new RuntimeException(e1); + } + } catch (CancellationException ce) { + throw new InterruptedException( + "Blocked callable interrupted by rotation event"); + } catch (InterruptedException ex) { + LOG.warn("Unexpected Exception " + ex.getMessage(), ex); + throw ex; + } + } + } http://git-wip-us.apache.org/repos/asf/flume/blob/67d44cd8/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java index 76e3d1f..741ac90 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java @@ -26,15 +26,10 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.TimeZone; import java.util.Map.Entry; -import java.util.concurrent.Callable; -import java.util.concurrent.CancellationException; -import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicReference; import com.google.common.annotations.VisibleForTesting; @@ -81,7 +76,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable { private static final String defaultInUsePrefix = ""; private static final String defaultInUseSuffix = ".tmp"; private static final long defaultBatchSize = 100; - private static final long defaultTxnEventMax = 100; private static final String defaultFileType = HDFSWriterFactory.SequenceFileType; private static final int defaultMaxOpenFiles = 5000; @@ -336,47 +330,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable { return codec; } - /** - * Execute the callable on a separate thread and wait for the completion - * for the specified amount of time in milliseconds. In case of timeout - * cancel the callable and throw an IOException - */ - private <T> T callWithTimeout(Callable<T> callable) - throws IOException, InterruptedException { - Future<T> future = callTimeoutPool.submit(callable); - try { - if (callTimeout > 0) { - return future.get(callTimeout, TimeUnit.MILLISECONDS); - } else { - return future.get(); - } - } catch (TimeoutException eT) { - future.cancel(true); - sinkCounter.incrementConnectionFailedCount(); - throw new IOException("Callable timed out after " + callTimeout + " ms", - eT); - } catch (ExecutionException e1) { - sinkCounter.incrementConnectionFailedCount(); - Throwable cause = e1.getCause(); - if (cause instanceof IOException) { - throw (IOException) cause; - } else if (cause instanceof InterruptedException) { - throw (InterruptedException) cause; - } else if (cause instanceof RuntimeException) { - throw (RuntimeException) cause; - } else if (cause instanceof Error) { - throw (Error)cause; - } else { - throw new RuntimeException(e1); - } - } catch (CancellationException ce) { - throw new InterruptedException( - "Blocked callable interrupted by rotation event"); - } catch (InterruptedException ex) { - LOG.warn("Unexpected Exception " + ex.getMessage(), ex); - throw ex; - } - } /** * Pull events out of channel and send it to HDFS. Take at most batchSize @@ -423,7 +376,8 @@ public class HDFSEventSink extends AbstractSink implements Configurable { bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount, batchSize, context, realPath, realName, inUsePrefix, inUseSuffix, suffix, codeC, compType, hdfsWriter, timedRollerPool, - proxyTicket, sinkCounter, idleTimeout, idleCallback, lookupPath); + proxyTicket, sinkCounter, idleTimeout, idleCallback, + lookupPath, callTimeout, callTimeoutPool); sfWriters.put(lookupPath, bucketWriter); } @@ -434,7 +388,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { } // Write the data to HDFS - append(bucketWriter, event); + bucketWriter.append(event); } if (txnEventCount == 0) { @@ -447,7 +401,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { // flush all pending buckets before committing the transaction for (BucketWriter bucketWriter : writers) { - flush(bucketWriter); + bucketWriter.flush(); } transaction.commit(); @@ -482,7 +436,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { LOG.info("Closing {}", entry.getKey()); try { - close(entry.getValue()); + entry.getValue().close(); } catch (Exception ex) { LOG.warn("Exception while closing " + entry.getKey() + ". " + "Exception follows.", ex); @@ -727,49 +681,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable { " }"; } - /** - * Append to bucket writer with timeout enforced - */ - private void append(final BucketWriter bucketWriter, final Event event) - throws IOException, InterruptedException { - - // Write the data to HDFS - callWithTimeout(new Callable<Void>() { - public Void call() throws Exception { - bucketWriter.append(event); - return null; - } - }); - } - - /** - * Flush bucket writer with timeout enforced - */ - private void flush(final BucketWriter bucketWriter) - throws IOException, InterruptedException { - - callWithTimeout(new Callable<Void>() { - public Void call() throws Exception { - bucketWriter.flush(); - return null; - } - }); - } - - /** - * Close bucket writer with timeout enforced - */ - private void close(final BucketWriter bucketWriter) - throws IOException, InterruptedException { - - callWithTimeout(new Callable<Void>() { - public Void call() throws Exception { - bucketWriter.close(); - return null; - } - }); - } - @VisibleForTesting void setBucketClock(Clock clock) { BucketPath.setClock(clock); http://git-wip-us.apache.org/repos/asf/flume/blob/67d44cd8/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java index ebe277c..99b6150 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java @@ -69,7 +69,7 @@ public class TestBucketWriter { "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, null, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), 0, - null, null); + null, null, 30000, Executors.newSingleThreadExecutor()); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); for (int i = 0; i < 1000; i++) { @@ -93,7 +93,7 @@ public class TestBucketWriter { "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, null, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), - 0, null, null); + 0, null, null, 30000, Executors.newSingleThreadExecutor()); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); for (int i = 0; i < 1000; i++) { @@ -119,7 +119,7 @@ public class TestBucketWriter { "/tmp", "file", "", ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, null, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), - 0, null, null); + 0, null, null, 30000, Executors.newSingleThreadExecutor()); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); long startNanos = System.nanoTime(); @@ -205,7 +205,7 @@ public class TestBucketWriter { path, name, "", ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, null, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), - 0, null, null); + 0, null, null, 30000, Executors.newSingleThreadExecutor()); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); for (int i = 0; i < NUM_EVENTS - 1; i++) { @@ -228,7 +228,7 @@ public class TestBucketWriter { "/tmp", "file", "", ".tmp", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, null, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), - 0, null, null); + 0, null, null, 30000, Executors.newSingleThreadExecutor()); // Need to override system time use for test so we know what to expect final long testTime = System.currentTimeMillis(); @@ -255,7 +255,7 @@ public class TestBucketWriter { "/tmp", "file", "", ".tmp", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, null, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), - 0, null, null); + 0, null, null, 30000, Executors.newSingleThreadExecutor()); // Need to override system time use for test so we know what to expect @@ -285,7 +285,7 @@ public class TestBucketWriter { "/tmp", "file", PREFIX, ".tmp", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, null, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), - 0, null, null); + 0, null, null, 30000, Executors.newSingleThreadExecutor()); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); bucketWriter.append(e); @@ -304,7 +304,7 @@ public class TestBucketWriter { "/tmp", "file", "", SUFFIX, null, null, SequenceFile.CompressionType.NONE, hdfsWriter, timedRollerPool, null, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis()), - 0, null, null); + 0, null, null, 30000, Executors.newSingleThreadExecutor()); Event e = EventBuilder.withBody("foo", Charsets.UTF_8); bucketWriter.append(e);
