Updated Branches: refs/heads/flume-1.3.0 c9fcf2e09 -> 6ea3a1596
FLUME-1645. Add hdfs.fileSuffix property to HDFSEventSink. (Steve Hoffman via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/6ea3a159 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/6ea3a159 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/6ea3a159 Branch: refs/heads/flume-1.3.0 Commit: 6ea3a1596ee8a6052c08032c12d87660ff15dbf2 Parents: c9fcf2e Author: Hari Shreedharan <[email protected]> Authored: Fri Oct 26 18:10:27 2012 -0400 Committer: Hari Shreedharan <[email protected]> Committed: Fri Oct 26 18:12:46 2012 -0400 ---------------------------------------------------------------------- .../src/main/java/org/apache/flume/Clock.java | 28 ++++++ .../main/java/org/apache/flume/SystemClock.java | 30 ++++++ flume-ng-doc/sphinx/FlumeUserGuide.rst | 1 + .../org/apache/flume/sink/hdfs/BucketWriter.java | 24 +++-- .../org/apache/flume/sink/hdfs/HDFSEventSink.java | 11 +- .../org/apache/flume/sink/hdfs/MockHDFSWriter.java | 13 ++- .../apache/flume/sink/hdfs/TestBucketWriter.java | 76 +++++++++++++-- 7 files changed, 154 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/6ea3a159/flume-ng-core/src/main/java/org/apache/flume/Clock.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/Clock.java b/flume-ng-core/src/main/java/org/apache/flume/Clock.java new file mode 100644 index 0000000..fc719bc --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/Clock.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume; + +/** + * Facade for System.currentTimeMillis for Testing + */ +public interface Clock { + + public long currentTimeMillis(); + +} http://git-wip-us.apache.org/repos/asf/flume/blob/6ea3a159/flume-ng-core/src/main/java/org/apache/flume/SystemClock.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/SystemClock.java b/flume-ng-core/src/main/java/org/apache/flume/SystemClock.java new file mode 100644 index 0000000..f176807 --- /dev/null +++ b/flume-ng-core/src/main/java/org/apache/flume/SystemClock.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.flume; + +/** + * Default implementation of Clock which uses System + */ +public class SystemClock implements Clock { + + public long currentTimeMillis() { + return System.currentTimeMillis(); + } + +} http://git-wip-us.apache.org/repos/asf/flume/blob/6ea3a159/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 9604b78..c4316ad 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -1066,6 +1066,7 @@ Name Default Description **type** -- The component type name, needs to be ``hdfs`` **hdfs.path** -- HDFS directory path (eg hdfs://namenode/flume/webdata/) hdfs.filePrefix FlumeData Name prefixed to files created by Flume in hdfs directory +hdfs.fileSuffix -- Suffix to append to file (eg ``.avro`` - *NOTE: period is not automatically added*) hdfs.rollInterval 30 Number of seconds to wait before rolling current file (0 = never roll based on time interval) hdfs.rollSize 1024 File size to trigger roll, in bytes (0: never roll based on file size) http://git-wip-us.apache.org/repos/asf/flume/blob/6ea3a159/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java index bce8e11..9f2c763 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/BucketWriter.java @@ -27,8 +27,10 @@ import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; +import org.apache.flume.Clock; import org.apache.flume.Context; import org.apache.flume.Event; +import org.apache.flume.SystemClock; import org.apache.flume.instrumentation.SinkCounter; import org.apache.flume.sink.FlumeFormatter; import org.apache.hadoop.conf.Configuration; @@ -52,7 +54,7 @@ class BucketWriter { private static final Logger LOG = LoggerFactory .getLogger(BucketWriter.class); - private static final String IN_USE_EXT = ".tmp"; + static final String IN_USE_EXT = ".tmp"; /** * This lock ensures that only one thread can open a file at a time. */ @@ -78,14 +80,17 @@ class BucketWriter { private FileSystem fileSystem; private volatile String filePath; + private volatile String fileSuffix; private volatile String bucketPath; private volatile long batchCounter; private volatile boolean isOpen; private volatile ScheduledFuture<Void> timedRollFuture; private SinkCounter sinkCounter; + private Clock clock = new SystemClock(); + BucketWriter(long rollInterval, long rollSize, long rollCount, long batchSize, - Context context, String filePath, CompressionCodec codeC, + Context context, String filePath, String fileSuffix, CompressionCodec codeC, CompressionType compType, HDFSWriter writer, FlumeFormatter formatter, ScheduledExecutorService timedRollerPool, UserGroupInformation user, SinkCounter sinkCounter) { @@ -95,6 +100,7 @@ class BucketWriter { this.batchSize = batchSize; this.context = context; this.filePath = filePath; + this.fileSuffix = fileSuffix; this.codeC = codeC; this.compType = compType; this.writer = writer; @@ -103,7 +109,7 @@ class BucketWriter { this.user = user; this.sinkCounter = sinkCounter; - fileExtensionCounter = new AtomicLong(System.currentTimeMillis()); + fileExtensionCounter = new AtomicLong(clock.currentTimeMillis()); isOpen = false; writer.configure(context); @@ -152,7 +158,6 @@ class BucketWriter { */ private void open() throws IOException, InterruptedException { runPrivileged(new PrivilegedExceptionAction<Void>() { - @Override public Void run() throws Exception { doOpen(); return null; @@ -183,6 +188,10 @@ class BucketWriter { long counter = fileExtensionCounter.incrementAndGet(); if (codeC == null) { bucketPath = filePath + "." + counter; + // FLUME-1645 - add suffix if specified + if (fileSuffix != null && fileSuffix.length() > 0) { + bucketPath += fileSuffix; + } // Need to get reference to FS using above config before underlying // writer does in order to avoid shutdown hook & IllegalStateExceptions fileSystem = new Path(bucketPath).getFileSystem(config); @@ -211,7 +220,6 @@ class BucketWriter { // if time-based rolling is enabled, schedule the roll if (rollInterval > 0) { Callable<Void> action = new Callable<Void>() { - @Override public Void call() throws Exception { LOG.debug("Rolling file ({}): Roll scheduled after {} sec elapsed.", bucketPath + IN_USE_EXT, rollInterval); @@ -238,7 +246,6 @@ class BucketWriter { public synchronized void close() throws IOException, InterruptedException { flush(); runPrivileged(new PrivilegedExceptionAction<Void>() { - @Override public Void run() throws Exception { doClose(); return null; @@ -284,7 +291,6 @@ class BucketWriter { public synchronized void flush() throws IOException, InterruptedException { if (!isBatchComplete()) { runPrivileged(new PrivilegedExceptionAction<Void>() { - @Override public Void run() throws Exception { doFlush(); return null; @@ -390,4 +396,8 @@ class BucketWriter { private boolean isBatchComplete() { return (batchCounter == 0); } + + void setClock(Clock clock) { + this.clock = clock; + } } http://git-wip-us.apache.org/repos/asf/flume/blob/6ea3a159/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java index a6d624b..e369604 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java @@ -69,6 +69,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { private static final long defaultRollSize = 1024; private static final long defaultRollCount = 10; private static final String defaultFileName = "FlumeData"; + private static final String defaultSuffix = ""; private static final long defaultBatchSize = 100; private static final long defaultTxnEventMax = 100; private static final String defaultFileType = HDFSWriterFactory.SequenceFileType; @@ -108,6 +109,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { private CompressionType compType; private String fileType; private String path; + private String suffix; private TimeZone timeZone; private int maxOpenFiles; private String writeFormat; @@ -170,13 +172,14 @@ public class HDFSEventSink extends AbstractSink implements Configurable { } // read configuration and setup thresholds - @Override public void configure(Context context) { this.context = context; String dirpath = Preconditions.checkNotNull( context.getString("hdfs.path"), "hdfs.path is required"); String fileName = context.getString("hdfs.filePrefix", defaultFileName); + // FLUME-1645: add suffix support + this.suffix = context.getString("hdfs.fileSuffix", defaultSuffix); this.path = dirpath + System.getProperty("file.separator") + fileName; String tzName = context.getString("hdfs.timeZone"); timeZone = tzName == null ? null : TimeZone.getTimeZone(tzName); @@ -370,7 +373,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable { * HDFS. <br/> * This method is not thread safe. */ - @Override public Status process() throws EventDeliveryException { Channel channel = getChannel(); Transaction transaction = channel.getTransaction(); @@ -396,7 +398,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { .getFormatter(writeFormat); bucketWriter = new BucketWriter(rollInterval, rollSize, rollCount, - batchSize, context, realPath, codeC, compType, hdfsWriter, + batchSize, context, realPath, suffix, codeC, compType, hdfsWriter, formatter, timedRollerPool, proxyTicket, sinkCounter); sfWriters.put(realPath, bucketWriter); @@ -706,7 +708,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable { // Write the data to HDFS callWithTimeout(new Callable<Void>() { - @Override public Void call() throws Exception { bucketWriter.append(event); return null; @@ -721,7 +722,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable { throws IOException, InterruptedException { callWithTimeout(new Callable<Void>() { - @Override public Void call() throws Exception { bucketWriter.flush(); return null; @@ -736,7 +736,6 @@ public class HDFSEventSink extends AbstractSink implements Configurable { throws IOException, InterruptedException { callWithTimeout(new Callable<Void>() { - @Override public Void call() throws Exception { bucketWriter.close(); return null; http://git-wip-us.apache.org/repos/asf/flume/blob/6ea3a159/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java index 24a7cbf..0b7910a 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/MockHDFSWriter.java @@ -31,6 +31,7 @@ public class MockHDFSWriter implements HDFSWriter { private int filesClosed = 0; private int bytesWritten = 0; private int eventsWritten = 0; + private String filePath = null; public int getFilesOpened() { return filesOpened; @@ -48,6 +49,10 @@ public class MockHDFSWriter implements HDFSWriter { return eventsWritten; } + public String getOpenedFilePath() { + return filePath; + } + public void clear() { filesOpened = 0; filesClosed = 0; @@ -55,33 +60,29 @@ public class MockHDFSWriter implements HDFSWriter { eventsWritten = 0; } - @Override public void configure(Context context) { // no-op } - @Override public void open(String filePath, FlumeFormatter fmt) throws IOException { + this.filePath = filePath; filesOpened++; } - @Override public void open(String filePath, CompressionCodec codec, CompressionType cType, FlumeFormatter fmt) throws IOException { + this.filePath = filePath; filesOpened++; } - @Override public void append(Event e, FlumeFormatter fmt) throws IOException { eventsWritten++; bytesWritten += e.getBody().length; } - @Override public void sync() throws IOException { // does nothing } - @Override public void close() throws IOException { filesClosed++; } http://git-wip-us.apache.org/repos/asf/flume/blob/6ea3a159/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java index 60f1830..6a8072e 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestBucketWriter.java @@ -23,6 +23,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.flume.Clock; import org.apache.flume.Context; import org.apache.flume.Event; import org.apache.flume.event.EventBuilder; @@ -67,7 +68,7 @@ public class TestBucketWriter { MockHDFSWriter hdfsWriter = new MockHDFSWriter(); HDFSTextFormatter formatter = new HDFSTextFormatter(); BucketWriter bucketWriter = new BucketWriter(0, 0, maxEvents, 0, ctx, - "/tmp/file", null, SequenceFile.CompressionType.NONE, hdfsWriter, + "/tmp/file", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, formatter, timedRollerPool, null, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis())); @@ -91,7 +92,7 @@ public class TestBucketWriter { MockHDFSWriter hdfsWriter = new MockHDFSWriter(); HDFSTextFormatter formatter = new HDFSTextFormatter(); BucketWriter bucketWriter = new BucketWriter(0, maxBytes, 0, 0, ctx, - "/tmp/file", null, SequenceFile.CompressionType.NONE, hdfsWriter, + "/tmp/file", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, formatter, timedRollerPool, null, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis())); @@ -117,7 +118,7 @@ public class TestBucketWriter { MockHDFSWriter hdfsWriter = new MockHDFSWriter(); HDFSTextFormatter formatter = new HDFSTextFormatter(); BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, - "/tmp/file", null, SequenceFile.CompressionType.NONE, hdfsWriter, + "/tmp/file", null, null, SequenceFile.CompressionType.NONE, hdfsWriter, formatter, timedRollerPool, null, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis())); @@ -166,30 +167,30 @@ public class TestBucketWriter { HDFSWriter hdfsWriter = new HDFSWriter() { private volatile boolean open = false; - @Override + public void configure(Context context) { } - @Override + public void sync() throws IOException { if(!open) { throw new IOException("closed"); } } - @Override + public void open(String filePath, CompressionCodec codec, CompressionType cType, FlumeFormatter fmt) throws IOException { open = true; } - @Override + public void open(String filePath, FlumeFormatter fmt) throws IOException { open = true; } - @Override + public void close() throws IOException { open = false; } - @Override + public void append(Event e, FlumeFormatter fmt) throws IOException { // we just re-open in append if closed open = true; @@ -199,7 +200,7 @@ public class TestBucketWriter { File tmpFile = File.createTempFile("flume", "test"); tmpFile.deleteOnExit(); BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, - tmpFile.getName(), null, SequenceFile.CompressionType.NONE, hdfsWriter, + tmpFile.getName(), null, null, SequenceFile.CompressionType.NONE, hdfsWriter, formatter, timedRollerPool, null, new SinkCounter("test-bucket-writer-" + System.currentTimeMillis())); @@ -214,4 +215,59 @@ public class TestBucketWriter { bucketWriter.flush(); // throws closed exception } + @Test + public void testFileSuffixNotGiven() throws IOException, InterruptedException { + final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test + final String suffix = null; + + MockHDFSWriter hdfsWriter = new MockHDFSWriter(); + HDFSTextFormatter formatter = new HDFSTextFormatter(); + BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, + "/tmp/file", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter, + formatter, timedRollerPool, null, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis())); + + // Need to override system time use for test so we know what to expect + final long testTime = System.currentTimeMillis(); + Clock testClock = new Clock() { + public long currentTimeMillis() { + return testTime; + } + }; + bucketWriter.setClock(testClock); + + Event e = EventBuilder.withBody("foo", Charsets.UTF_8); + bucketWriter.append(e); + + Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(Long.toString(testTime+1) + BucketWriter.IN_USE_EXT)); + } + + @Test + public void testFileSuffixGiven() throws IOException, InterruptedException { + final int ROLL_INTERVAL = 1000; // seconds. Make sure it doesn't change in course of test + final String suffix = ".avro"; + + MockHDFSWriter hdfsWriter = new MockHDFSWriter(); + HDFSTextFormatter formatter = new HDFSTextFormatter(); + BucketWriter bucketWriter = new BucketWriter(ROLL_INTERVAL, 0, 0, 0, ctx, + "/tmp/file", suffix, null, SequenceFile.CompressionType.NONE, hdfsWriter, + formatter, timedRollerPool, null, + new SinkCounter("test-bucket-writer-" + System.currentTimeMillis())); + + // Need to override system time use for test so we know what to expect + + final long testTime = System.currentTimeMillis(); + + Clock testClock = new Clock() { + public long currentTimeMillis() { + return testTime; + } + }; + bucketWriter.setClock(testClock); + + Event e = EventBuilder.withBody("foo", Charsets.UTF_8); + bucketWriter.append(e); + + Assert.assertTrue("Incorrect suffix", hdfsWriter.getOpenedFilePath().endsWith(Long.toString(testTime+1) + suffix + BucketWriter.IN_USE_EXT)); + } }
