FLUME-1509. HDFS sink should allow for the use of different timezones when resolving sink paths.
(Jonathan Natkins via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/d1b65144 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/d1b65144 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/d1b65144 Branch: refs/heads/cdh-1.2.0+24_intuit Commit: d1b6514492bfe536b29ecf8c311ff230d34d5e1c Parents: 64a7586 Author: Hari Shreedharan <[email protected]> Authored: Tue Sep 4 15:34:05 2012 -0700 Committer: Mike Percy <[email protected]> Committed: Fri Sep 7 14:03:07 2012 -0700 ---------------------------------------------------------------------- .../apache/flume/formatter/output/BucketPath.java | 32 +++++++++++++- .../flume/formatter/output/TestBucketPath.java | 15 +++++++ flume-ng-doc/sphinx/FlumeUserGuide.rst | 1 + .../org/apache/flume/sink/hdfs/HDFSEventSink.java | 6 ++- .../apache/flume/sink/hdfs/TestHDFSEventSink.java | 1 + 5 files changed, 51 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/d1b65144/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java index cf105c7..fcc26f2 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java +++ b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java @@ -23,6 +23,7 @@ import java.util.Calendar; import java.util.Date; import java.util.HashMap; import java.util.Map; +import java.util.TimeZone; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -116,6 +117,16 @@ public class BucketPath { } /** + * A wrapper around + * {@link BucketPath#replaceShorthand(char, Map, TimeZone, boolean, int, int)} + * with the timezone set to the default. + */ + public static String replaceShorthand(char c, Map<String, String> headers, + boolean needRounding, int unit, int roundDown) { + return replaceShorthand(c, headers, null, needRounding, unit, roundDown); + } + + /** * Hardcoded lookups for %x style escape replacement. Add your own! * * All shorthands are Date format strings, currently. @@ -125,6 +136,7 @@ public class BucketPath { * Dates follow the same format as unix date, with a few exceptions. * @param c - The character to replace. * @param headers - Event headers + * @param timeZone - The timezone to use for formatting the timestamp * @param needRounding - Should the timestamp be rounded down? * @param unit - if needRounding is true, what unit to round down to. This * must be one of the units specified by {@link java.util.Calendar} - @@ -138,7 +150,7 @@ public class BucketPath { * @return */ public static String replaceShorthand(char c, Map<String, String> headers, - boolean needRounding, int unit, int roundDown) { + TimeZone timeZone, boolean needRounding, int unit, int roundDown) { String timestampHeader = headers.get("timestamp"); long ts; @@ -229,6 +241,10 @@ public class BucketPath { } SimpleDateFormat format = new SimpleDateFormat(formatString); + if (timeZone != null) { + format.setTimeZone(timeZone); + } + Date date = new Date(ts); return format.format(date); } @@ -272,6 +288,16 @@ public class BucketPath { } /** + * A wrapper around + * {@link BucketPath#escapeString(String, Map, TimeZone, boolean, int, int)} + * with the timezone set to the default. + */ + public static String escapeString(String in, Map<String, String> headers, + boolean needRounding, int unit, int roundDown) { + return escapeString(in, headers, null, needRounding, unit, roundDown); + } + + /** * Replace all substrings of form %{tagname} with get(tagname).toString() and * all shorthand substrings of form %x with a special value. * @@ -293,7 +319,7 @@ public class BucketPath { * @return Escaped string. */ public static String escapeString(String in, Map<String, String> headers, - boolean needRounding, int unit, int roundDown) { + TimeZone timeZone, boolean needRounding, int unit, int roundDown) { Matcher matcher = tagPattern.matcher(in); StringBuffer sb = new StringBuffer(); while (matcher.find()) { @@ -314,7 +340,7 @@ public class BucketPath { && matcher.group(1).length() == 1, "Expected to match single character tag in string " + in); char c = matcher.group(1).charAt(0); - replacement = replaceShorthand(c, headers, + replacement = replaceShorthand(c, headers, timeZone, needRounding, unit, roundDown); } http://git-wip-us.apache.org/repos/asf/flume/blob/d1b65144/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java b/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java index 86f3293..090b3a8 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java +++ b/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java @@ -24,6 +24,7 @@ import java.util.Calendar; import java.util.Date; import java.util.HashMap; import java.util.Map; +import java.util.TimeZone; import org.junit.Assert; import org.junit.Before; @@ -97,4 +98,18 @@ public class TestBucketPath { Assert.assertEquals(expectedString, escapedString); } + @Test + public void testDateFormatTimeZone(){ + TimeZone utcTimeZone = TimeZone.getTimeZone("UTC"); + String test = "%c"; + String escapedString = BucketPath.escapeString( + test, headers, utcTimeZone, false, Calendar.HOUR_OF_DAY, 12); + System.out.println("Escaped String: " + escapedString); + SimpleDateFormat format = new SimpleDateFormat("EEE MMM d HH:mm:ss yyyy"); + format.setTimeZone(utcTimeZone); + Date d = new Date(cal.getTimeInMillis()); + String expectedString = format.format(d); + System.out.println("Expected String: "+ expectedString); + Assert.assertEquals(expectedString, escapedString); + } } http://git-wip-us.apache.org/repos/asf/flume/blob/d1b65144/flume-ng-doc/sphinx/FlumeUserGuide.rst ---------------------------------------------------------------------- diff --git a/flume-ng-doc/sphinx/FlumeUserGuide.rst b/flume-ng-doc/sphinx/FlumeUserGuide.rst index 1fb549b..5928ad3 100644 --- a/flume-ng-doc/sphinx/FlumeUserGuide.rst +++ b/flume-ng-doc/sphinx/FlumeUserGuide.rst @@ -984,6 +984,7 @@ Name Default Description **channel** -- **type** -- The component type name, needs to be ``hdfs`` **hdfs.path** -- HDFS directory path (eg hdfs://namenode/flume/webdata/) +hdfs.timeZone Local Time Name of the timezone that should be used for resolving the directory path, e.g. America/Los_Angeles. hdfs.filePrefix FlumeData Name prefixed to files created by Flume in hdfs directory hdfs.rollInterval 30 Number of seconds to wait before rolling current file (0 = never roll based on time interval) http://git-wip-us.apache.org/repos/asf/flume/blob/d1b65144/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java index fcb9642..9a76ecb 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSEventSink.java @@ -24,6 +24,7 @@ import java.util.ArrayList; import java.util.Calendar; import java.util.LinkedHashMap; import java.util.List; +import java.util.TimeZone; import java.util.Map.Entry; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; @@ -108,6 +109,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { private CompressionType compType; private String fileType; private String path; + private TimeZone timeZone; private int maxOpenFiles; private String writeFormat; private ExecutorService callTimeoutPool; @@ -177,6 +179,8 @@ public class HDFSEventSink extends AbstractSink implements Configurable { context.getString("hdfs.path"), "hdfs.path is required"); String fileName = context.getString("hdfs.filePrefix", defaultFileName); this.path = dirpath + System.getProperty("file.separator") + fileName; + String tzName = context.getString("hdfs.timeZone"); + timeZone = tzName == null ? null : TimeZone.getTimeZone(tzName); rollInterval = context.getLong("hdfs.rollInterval", defaultRollInterval); rollSize = context.getLong("hdfs.rollSize", defaultRollSize); rollCount = context.getLong("hdfs.rollCount", defaultRollCount); @@ -387,7 +391,7 @@ public class HDFSEventSink extends AbstractSink implements Configurable { // reconstruct the path name by substituting place holders String realPath = BucketPath.escapeString(path, event.getHeaders(), - needRounding, roundUnit, roundValue); + timeZone, needRounding, roundUnit, roundValue); BucketWriter bucketWriter = sfWriters.get(realPath); // we haven't seen this file yet, so open it and cache the handle http://git-wip-us.apache.org/repos/asf/flume/blob/d1b65144/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java index b5f8c88..ba30d01 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/test/java/org/apache/flume/sink/hdfs/TestHDFSEventSink.java @@ -509,6 +509,7 @@ public class TestHDFSEventSink { Context context = new Context(); context.put("hdfs.path", testPath + "/%Y-%m-%d/%H"); + context.put("hdfs.timeZone", "UTC"); context.put("hdfs.filePrefix", fileName); context.put("hdfs.txnEventMax", String.valueOf(txnMax)); context.put("hdfs.rollCount", String.valueOf(rollCount));
