Updated Branches: refs/heads/trunk 6662b34c0 -> 0a855488a
FLUME-2014. Race condition when using local timestamp with BucketPath (Mike Percy via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/0a855488 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/0a855488 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/0a855488 Branch: refs/heads/trunk Commit: 0a855488a152cf695e2e1e2f89b67ed2e3095422 Parents: 6662b34 Author: Hari Shreedharan <[email protected]> Authored: Mon Apr 22 15:30:49 2013 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Mon Apr 22 15:30:49 2013 -0700 ---------------------------------------------------------------------- .../apache/flume/formatter/output/BucketPath.java | 69 +++++++++++++-- .../flume/formatter/output/TestBucketPath.java | 29 ++++++ 2 files changed, 90 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/0a855488/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java index 971c75c..bef4b1f 100644 --- a/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java +++ b/flume-ng-core/src/main/java/org/apache/flume/formatter/output/BucketPath.java @@ -49,10 +49,14 @@ public class BucketPath { * Returns true if in contains a substring matching TAG_REGEX (i.e. of the * form %{...} or %x. */ + @VisibleForTesting + @Deprecated public static boolean containsTag(String in) { return tagPattern.matcher(in).find(); } + @VisibleForTesting + @Deprecated public static String expandShorthand(char c) { // It's a date switch (c) { @@ -116,16 +120,25 @@ public class BucketPath { * * Dates follow the same format as unix date, with a few exceptions. * + * <p>This static method will be REMOVED in a future version of Flume</p> + * */ + @VisibleForTesting + @Deprecated public static String replaceShorthand(char c, Map<String, String> headers) { return replaceShorthand(c, headers, false, 0, 0); } /** * A wrapper around - * {@link BucketPath#replaceShorthand(char, Map, TimeZone, boolean, int, int)} + * {@link BucketPath#replaceShorthand(char, Map, TimeZone, boolean, int, + * int, boolean)} * with the timezone set to the default. + * + * <p>This static method will be REMOVED in a future version of Flume</p> */ + @VisibleForTesting + @Deprecated public static String replaceShorthand(char c, Map<String, String> headers, boolean needRounding, int unit, int roundDown) { return replaceShorthand(c, headers, null, needRounding, unit, roundDown, @@ -140,6 +153,9 @@ public class BucketPath { * Returns the empty string if an escape is not recognized. * * Dates follow the same format as unix date, with a few exceptions. + * + * <p>This static method will be REMOVED in a future version of Flume</p> + * * @param c - The character to replace. * @param headers - Event headers * @param timeZone - The timezone to use for formatting the timestamp @@ -153,22 +169,40 @@ public class BucketPath { * value, smaller than the time supplied, defaults to 1, if <= 0(rounds off * to the second/minute/hour immediately lower than the timestamp supplied. * Ignored if needRounding is false. + * * @return */ + @VisibleForTesting + @Deprecated public static String replaceShorthand(char c, Map<String, String> headers, TimeZone timeZone, boolean needRounding, int unit, int roundDown, boolean useLocalTimestamp) { - long ts; - String timestampHeader; + long ts = 0; + if (useLocalTimestamp) { + ts = clock.currentTimeMillis(); + } + return replaceShorthand(c, headers, timeZone, needRounding, unit, + roundDown, false, ts); + } + + /** + * Not intended as a public API + */ + @VisibleForTesting + protected static String replaceShorthand(char c, Map<String, String> headers, + TimeZone timeZone, boolean needRounding, int unit, int roundDown, + boolean useLocalTimestamp, long ts) { + + String timestampHeader = null; try { if(!useLocalTimestamp) { timestampHeader = headers.get("timestamp"); Preconditions.checkNotNull(timestampHeader, "Expected timestamp in " + "the Flume event headers, but it was null"); + ts = Long.valueOf(timestampHeader); } else { - timestampHeader = String.valueOf(clock.currentTimeMillis()); + timestampHeader = String.valueOf(ts); } - ts = Long.valueOf(timestampHeader); } catch (NumberFormatException e) { throw new RuntimeException("Flume wasn't able to parse timestamp header" + " in the event to resolve time based bucketing. Please check that" @@ -302,7 +336,8 @@ public class BucketPath { /** * A wrapper around - * {@link BucketPath#escapeString(String, Map, TimeZone, boolean, int, int)} + * {@link BucketPath#escapeString(String, Map, TimeZone, boolean, int, int, + boolean)} * with the timezone set to the default. */ public static String escapeString(String in, Map<String, String> headers, @@ -335,6 +370,9 @@ public class BucketPath { public static String escapeString(String in, Map<String, String> headers, TimeZone timeZone, boolean needRounding, int unit, int roundDown, boolean useLocalTimeStamp) { + + long ts = clock.currentTimeMillis(); + Matcher matcher = tagPattern.matcher(in); StringBuffer sb = new StringBuffer(); while (matcher.find()) { @@ -356,7 +394,7 @@ public class BucketPath { "Expected to match single character tag in string " + in); char c = matcher.group(1).charAt(0); replacement = replaceShorthand(c, headers, timeZone, - needRounding, unit, roundDown, useLocalTimeStamp); + needRounding, unit, roundDown, useLocalTimeStamp, ts); } // The replacement string must have '$' and '\' chars escaped. This @@ -383,10 +421,15 @@ public class BucketPath { * mapping of an attribute name to the value based on the escape sequence * found in the argument string. */ + @VisibleForTesting + @Deprecated public static Map<String, String> getEscapeMapping(String in, Map<String, String> headers) { return getEscapeMapping(in, headers, false, 0, 0); } + + @VisibleForTesting + @Deprecated public static Map<String, String> getEscapeMapping(String in, Map<String, String> headers, boolean needRounding, int unit, int roundDown) { @@ -421,10 +464,20 @@ public class BucketPath { } - //Should not be called from outside unit tests. + /* + * May not be called from outside unit tests. + */ @VisibleForTesting public static void setClock(Clock clk) { clock = clk; } + + /* + * May not be called from outside unit tests. + */ + @VisibleForTesting + public static Clock getClock() { + return clock; + } } http://git-wip-us.apache.org/repos/asf/flume/blob/0a855488/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java ---------------------------------------------------------------------- diff --git a/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java b/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java index 9cfefc0..c441c4a 100644 --- a/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java +++ b/flume-ng-core/src/test/java/org/apache/flume/formatter/output/TestBucketPath.java @@ -26,10 +26,16 @@ import java.util.HashMap; import java.util.Map; import java.util.TimeZone; +import org.apache.flume.Clock; +import org.joda.time.format.DateTimeFormatter; +import org.joda.time.format.ISODateTimeFormat; import org.junit.Assert; import org.junit.Before; import org.junit.Test; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class TestBucketPath { Calendar cal; Map<String, String> headers; @@ -112,4 +118,27 @@ public class TestBucketPath { System.out.println("Expected String: "+ expectedString); Assert.assertEquals(expectedString, escapedString); } + + @Test + public void testDateRace() { + Clock mockClock = mock(Clock.class); + DateTimeFormatter parser = ISODateTimeFormat.dateTimeParser(); + long two = parser.parseMillis("2013-04-21T02:59:59-00:00"); + long three = parser.parseMillis("2013-04-21T03:00:00-00:00"); + when(mockClock.currentTimeMillis()).thenReturn(two, three); + + // save & modify static state (yuck) + Clock origClock = BucketPath.getClock(); + BucketPath.setClock(mockClock); + + String pat = "%H:%M"; + String escaped = BucketPath.escapeString(pat, + new HashMap<String, String>(), + TimeZone.getTimeZone("UTC"), true, Calendar.MINUTE, 10, true); + + // restore static state + BucketPath.setClock(origClock); + + Assert.assertEquals("Race condition detected", "02:50", escaped); + } }
