KYLIN-1270 improve TimedJsonStreamParser to support week_start, month_start, quarter_start, year_start
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ec5c28d9 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ec5c28d9 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ec5c28d9 Branch: refs/heads/2.x-staging Commit: ec5c28d99d58c761b7dd6a0be2301fc8563f156c Parents: 4d4e743 Author: honma <ho...@ebay.com> Authored: Wed Dec 30 21:33:37 2015 +0800 Committer: honma <ho...@ebay.com> Committed: Mon Jan 4 21:39:54 2016 +0800 ---------------------------------------------------------------------- .../org/apache/kylin/common/util/TimeUtil.java | 44 +++++++++++++++-- .../apache/kylin/common/util/TimeUtilTest.java | 21 +++++++- .../source/kafka/TimedJsonStreamParser.java | 51 ++++++++++++++------ 3 files changed, 94 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/ec5c28d9/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java b/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java index c79e88b..17868a6 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/TimeUtil.java @@ -1,12 +1,13 @@ package org.apache.kylin.common.util; +import java.util.Calendar; +import java.util.TimeZone; + /** */ public class TimeUtil { - public enum NormalizedTimeUnit { - MINUTE, HOUR, DAY - } + private static TimeZone gmt = TimeZone.getTimeZone("GMT"); private static long ONE_MINUTE_TS = 60 * 1000; private static long ONE_HOUR_TS = 60 * ONE_MINUTE_TS; private static long ONE_DAY_TS = 24 * ONE_HOUR_TS; @@ -23,7 +24,40 @@ public class TimeUtil { return ts / ONE_DAY_TS * ONE_DAY_TS; } - public static long getNextPeriodStart(long ts, long period) { - return ((ts + period - 1) / period) * period; + public static long getWeekStart(long ts) { + Calendar calendar = Calendar.getInstance(gmt); + calendar.setTimeInMillis(getDayStart(ts)); + calendar.add(Calendar.DAY_OF_WEEK, calendar.getFirstDayOfWeek() - calendar.get(Calendar.DAY_OF_WEEK)); + return calendar.getTimeInMillis(); } + + public static long getMonthStart(long ts) { + Calendar calendar = Calendar.getInstance(gmt); + calendar.setTimeInMillis(ts); + int year = calendar.get(Calendar.YEAR); + int month = calendar.get(Calendar.MONTH); + calendar.clear(); + calendar.set(year, month, 1); + return calendar.getTimeInMillis(); + } + + public static long getQuarterStart(long ts) { + Calendar calendar = Calendar.getInstance(gmt); + calendar.setTimeInMillis(ts); + int year = calendar.get(Calendar.YEAR); + int month = calendar.get(Calendar.MONTH); + calendar.clear(); + calendar.set(year, month / 3 * 3, 1); + return calendar.getTimeInMillis(); + } + + public static long getYearStart(long ts) { + Calendar calendar = Calendar.getInstance(gmt); + calendar.setTimeInMillis(ts); + int year = calendar.get(Calendar.YEAR); + calendar.clear(); + calendar.set(year, 0, 1); + return calendar.getTimeInMillis(); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/ec5c28d9/core-common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java ---------------------------------------------------------------------- diff --git a/core-common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java b/core-common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java index d81d49a..3fdf6aa 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/TimeUtilTest.java @@ -3,14 +3,19 @@ package org.apache.kylin.common.util; import java.text.ParseException; import java.text.SimpleDateFormat; import java.util.Calendar; +import java.util.TimeZone; -import org.apache.kylin.common.util.TimeUtil.NormalizedTimeUnit; import org.junit.Assert; import org.junit.Test; /** */ public class TimeUtilTest { + + public enum NormalizedTimeUnit { + MINUTE, HOUR, DAY, WEEK, MONTH, QUARTER, YEAR + } + public static long normalizeTime(long timeMillis, NormalizedTimeUnit unit) { Calendar a = Calendar.getInstance(); Calendar b = Calendar.getInstance(); @@ -28,6 +33,7 @@ public class TimeUtilTest { @Test public void basicTest() throws ParseException { java.text.DateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss"); + dateFormat.setTimeZone(TimeZone.getTimeZone("GMT")); long t1 = dateFormat.parse("2012/01/01 00:00:01").getTime(); Assert.assertEquals(normalizeTime(t1, NormalizedTimeUnit.HOUR), TimeUtil.getHourStart(t1)); @@ -36,6 +42,19 @@ public class TimeUtilTest { long t2 = dateFormat.parse("2012/12/31 11:02:01").getTime(); Assert.assertEquals(normalizeTime(t2, NormalizedTimeUnit.HOUR), TimeUtil.getHourStart(t2)); Assert.assertEquals(normalizeTime(t2, NormalizedTimeUnit.MINUTE), TimeUtil.getMinuteStart(t2)); + + long t3 = dateFormat.parse("2012/12/31 11:02:01").getTime(); + Assert.assertEquals(dateFormat.parse("2012/12/1 00:00:00").getTime(), TimeUtil.getMonthStart(t3)); + Assert.assertEquals(dateFormat.parse("2012/10/1 00:00:00").getTime(), TimeUtil.getQuarterStart(t3)); + Assert.assertEquals(dateFormat.parse("2012/1/1 00:00:00").getTime(), TimeUtil.getYearStart(t3)); + Assert.assertEquals(dateFormat.parse("2012/12/30 00:00:00").getTime(), TimeUtil.getWeekStart(t3)); + + long t4 = dateFormat.parse("2015/01/01 10:01:30").getTime(); + Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getMonthStart(t4)); + Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getQuarterStart(t4)); + Assert.assertEquals(dateFormat.parse("2015/1/1 00:00:00").getTime(), TimeUtil.getYearStart(t4)); + Assert.assertEquals(dateFormat.parse("2014/12/28 00:00:00").getTime(), TimeUtil.getWeekStart(t4)); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/ec5c28d9/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java index 00f93a5..c1d8379 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java @@ -34,22 +34,28 @@ package org.apache.kylin.source.kafka; -import com.fasterxml.jackson.databind.JavaType; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.fasterxml.jackson.databind.type.MapType; -import com.fasterxml.jackson.databind.type.SimpleType; -import com.google.common.collect.Lists; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + import kafka.message.MessageAndOffset; + import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.util.DateFormat; -import org.apache.kylin.common.util.TimeUtil; import org.apache.kylin.common.util.StreamingMessage; +import org.apache.kylin.common.util.TimeUtil; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.io.IOException; -import java.util.*; +import com.fasterxml.jackson.databind.JavaType; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.type.MapType; +import com.fasterxml.jackson.databind.type.SimpleType; +import com.google.common.collect.Lists; /** * each json message with a "timestamp" field @@ -108,25 +114,38 @@ public final class TimedJsonStreamParser extends StreamingParser { } ArrayList<String> result = Lists.newArrayList(); + long normalized = 0; for (TblColRef column : allColumns) { String columnName = column.getName(); if (columnName.equalsIgnoreCase("minute_start")) { - long minuteStart = TimeUtil.getMinuteStart(t); - result.add(formatTs ? DateFormat.formatToTimeStr(minuteStart) : String.valueOf(minuteStart)); + normalized = TimeUtil.getMinuteStart(t); + result.add(formatTs ? DateFormat.formatToTimeStr(normalized) : String.valueOf(normalized)); } else if (columnName.equalsIgnoreCase("hour_start")) { - long hourStart = TimeUtil.getHourStart(t); - result.add(formatTs ? DateFormat.formatToTimeStr(hourStart) : String.valueOf(hourStart)); + normalized = TimeUtil.getHourStart(t); + result.add(formatTs ? DateFormat.formatToTimeStr(normalized) : String.valueOf(normalized)); } else if (columnName.equalsIgnoreCase("day_start")) { - //of day start we'll add yyyy-mm-dd - long ts = TimeUtil.getDayStart(t); - result.add(DateFormat.formatToDateStr(ts)); + //from day_start on, formatTs will output date format + normalized = TimeUtil.getDayStart(t); + result.add(formatTs ? DateFormat.formatToDateStr(normalized) : String.valueOf(normalized)); + } else if (columnName.equalsIgnoreCase("week_start")) { + normalized = TimeUtil.getWeekStart(t); + result.add(formatTs ? DateFormat.formatToDateStr(normalized) : String.valueOf(normalized)); + } else if (columnName.equalsIgnoreCase("month_start")) { + normalized = TimeUtil.getMonthStart(t); + result.add(formatTs ? DateFormat.formatToDateStr(normalized) : String.valueOf(normalized)); + } else if (columnName.equalsIgnoreCase("quarter_start")) { + normalized = TimeUtil.getQuarterStart(t); + result.add(formatTs ? DateFormat.formatToDateStr(normalized) : String.valueOf(normalized)); + } else if (columnName.equalsIgnoreCase("year_start")) { + normalized = TimeUtil.getYearStart(t); + result.add(formatTs ? DateFormat.formatToDateStr(normalized) : String.valueOf(normalized)); } else { String x = root.get(columnName.toLowerCase()); result.add(x); } } - return new StreamingMessage(result, messageAndOffset.offset(), t, Collections.<String, Object>emptyMap()); + return new StreamingMessage(result, messageAndOffset.offset(), t, Collections.<String, Object> emptyMap()); } catch (IOException e) { logger.error("error", e);