Updated Branches: refs/heads/trunk e72e559ba -> b28b87b58
FLUME-1937. Issue with maxUnderReplication in HDFS sink. (Mike Percy via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/b28b87b5 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/b28b87b5 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/b28b87b5 Branch: refs/heads/trunk Commit: b28b87b58b2b08d31cac055cb3f9f8762f65b469 Parents: e72e559 Author: Hari Shreedharan <[email protected]> Authored: Fri Mar 8 19:00:29 2013 -0800 Committer: Hari Shreedharan <[email protected]> Committed: Fri Mar 8 19:00:29 2013 -0800 ---------------------------------------------------------------------- .../org/apache/flume/sink/hdfs/BucketWriter.java | 8 +- .../sink/hdfs/TestHDFSEventSinkOnMiniCluster.java | 107 ++++++++++++++- 2 files changed, 109 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/b28b87b5/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java index c11fb20..774f297 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java @@ -92,7 +92,7 @@ class BucketWriter { private volatile long batchCounter; private volatile boolean isOpen; private volatile boolean isUnderReplicated; - private volatile int consecutiveUnderReplRotateCount; + private volatile int consecutiveUnderReplRotateCount = 0; private volatile ScheduledFuture<Void> timedRollFuture; private SinkCounter sinkCounter; private final int idleTimeout; @@ -193,9 +193,6 @@ class BucketWriter { return null; } }); - - // ensure new files reset under-rep rotate count - consecutiveUnderReplRotateCount = 0; } /** @@ -576,7 +573,8 @@ class BucketWriter { } catch (TimeoutException eT) { future.cancel(true); sinkCounter.incrementConnectionFailedCount(); - throw new IOException("Callable timed out after " + callTimeout + " ms", + throw new IOException("Callable timed out after " + callTimeout + " ms" + + " on file: " + bucketPath, eT); } catch (ExecutionException e1) { sinkCounter.incrementConnectionFailedCount(); http://git-wip-us.apache.org/repos/asf/flume/blob/b28b87b5/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java index c2b96f7..6e11624 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSinkOnMiniCluster.java @@ -353,7 +353,7 @@ public class TestHDFSEventSinkOnMiniCluster { Assert.assertTrue(line.startsWith("yarg")); } - Assert.assertTrue("4 or 5 files expected", + Assert.assertTrue("4 or 5 files expected, found " + statuses.length, statuses.length == 4 || statuses.length == 5); System.out.println("There are " + statuses.length + " files."); @@ -365,6 +365,111 @@ public class TestHDFSEventSinkOnMiniCluster { cluster = null; } + /** + * This is a very basic test that writes one event to HDFS and reads it back. + */ + @Test + public void maxUnderReplicationTest() throws EventDeliveryException, + IOException { + Configuration conf = new Configuration(); + conf.set("dfs.replication", String.valueOf(3)); + cluster = new MiniDFSCluster(conf, 3, true, null); + cluster.waitActive(); + + String outputDir = "/flume/underReplicationTest"; + Path outputDirPath = new Path(outputDir); + + logger.info("Running test with output dir: {}", outputDir); + + FileSystem fs = cluster.getFileSystem(); + // ensure output directory is empty + if (fs.exists(outputDirPath)) { + fs.delete(outputDirPath, true); + } + + String nnURL = getNameNodeURL(cluster); + logger.info("Namenode address: {}", nnURL); + + Context chanCtx = new Context(); + MemoryChannel channel = new MemoryChannel(); + channel.setName("simpleHDFSTest-mem-chan"); + channel.configure(chanCtx); + channel.start(); + + Context sinkCtx = new Context(); + sinkCtx.put("hdfs.path", nnURL + outputDir); + sinkCtx.put("hdfs.fileType", HDFSWriterFactory.DataStreamType); + sinkCtx.put("hdfs.batchSize", Integer.toString(1)); + + HDFSEventSink sink = new HDFSEventSink(); + sink.setName("simpleHDFSTest-hdfs-sink"); + sink.configure(sinkCtx); + sink.setChannel(channel); + sink.start(); + + // create an event + channel.getTransaction().begin(); + try { + for (int i = 0; i < 50; i++) { + channel.put(EventBuilder.withBody("yarg " + i, Charsets.UTF_8)); + } + channel.getTransaction().commit(); + } finally { + channel.getTransaction().close(); + } + + // store events to HDFS + logger.info("Running process(). Create new file."); + sink.process(); // create new file; + logger.info("Running process(). Same file."); + sink.process(); + + // kill a datanode + logger.info("Killing datanode #1..."); + cluster.stopDataNode(0); + + // there is a race here.. the client may or may not notice that the + // datanode is dead before it next sync()s. + // so, this next call may or may not roll a new file. + + logger.info("Running process(). Create new file? (racy)"); + sink.process(); + + for (int i = 3; i < 50; i++) { + logger.info("Running process()."); + sink.process(); + } + + // shut down flume + sink.stop(); + channel.stop(); + + // verify that it's in HDFS and that its content is what we say it should be + FileStatus[] statuses = fs.listStatus(outputDirPath); + Assert.assertNotNull("No files found written to HDFS", statuses); + + for (FileStatus status : statuses) { + Path filePath = status.getPath(); + logger.info("Found file on DFS: {}", filePath); + FSDataInputStream stream = fs.open(filePath); + BufferedReader reader = new BufferedReader(new InputStreamReader(stream)); + String line = reader.readLine(); + logger.info("First line in file {}: {}", filePath, line); + Assert.assertTrue(line.startsWith("yarg")); + } + + System.out.println("There are " + statuses.length + " files."); + Assert.assertEquals("31 files expected, found " + statuses.length, + 31, statuses.length); + + if (!KEEP_DATA) { + fs.delete(outputDirPath, true); + } + + cluster.shutdown(); + cluster = null; + } + @AfterClass public static void teardownClass() { // restore system state, if needed
