Repository: kylin Updated Branches: refs/heads/master 0a3541254 -> 5b4cbc4b6
small refactor in StreamingParser Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5b4cbc4b Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5b4cbc4b Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5b4cbc4b Branch: refs/heads/master Commit: 5b4cbc4b61e444093b88fdb4ba3d3afeba7468d3 Parents: 0a35412 Author: shaofengshi <shaofeng...@apache.org> Authored: Wed Jun 15 09:52:17 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Wed Jun 15 09:52:47 2016 +0800 ---------------------------------------------------------------------- .../java/org/apache/kylin/cube/CubeSegment.java | 18 ++-- .../kylin/source/kafka/KafkaStreamingInput.java | 18 +++- .../kylin/source/kafka/StreamingParser.java | 93 +++++++++++++------- .../source/kafka/StringStreamingParser.java | 3 +- .../source/kafka/TimedJsonStreamParser.java | 57 ++++-------- 5 files changed, 112 insertions(+), 77 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/5b4cbc4b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java ---------------------------------------------------------------------- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java index cbcee14..f79e06d 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeSegment.java @@ -19,13 +19,10 @@ package org.apache.kylin.cube; import java.text.SimpleDateFormat; -import java.util.Collection; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.TimeZone; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import com.fasterxml.jackson.annotation.JsonInclude; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.Dictionary; @@ -101,6 +98,10 @@ public class CubeSegment implements Comparable<CubeSegment>, IRealizationSegment @JsonProperty("rowkey_stats") private List<Object[]> rowkeyStats = Lists.newArrayList(); + @JsonProperty("additionalInfo") + @JsonInclude(JsonInclude.Include.NON_EMPTY) + private HashMap<String, String> additionalInfo = new LinkedHashMap<String, String>(); + private volatile Map<Long, Short> cuboidBaseShards = Maps.newHashMap();//cuboid id ==> base(starting) shard for this cuboid public CubeDesc getCubeDesc() { @@ -523,4 +524,11 @@ public class CubeSegment implements Comparable<CubeSegment>, IRealizationSegment this.indexPath = indexPath; } + public HashMap<String, String> getAdditionalInfo() { + return additionalInfo; + } + + public void setAdditionalInfo(HashMap<String, String> additionalInfo) { + this.additionalInfo = additionalInfo; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/5b4cbc4b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java index c05119f..564c221 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java @@ -40,6 +40,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import com.google.common.base.Function; import kafka.cluster.Broker; import kafka.javaapi.FetchResponse; import kafka.javaapi.PartitionMetadata; @@ -49,11 +50,14 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; import org.apache.kylin.engine.streaming.IStreamingInput; import org.apache.kylin.common.util.StreamingBatch; import org.apache.kylin.common.util.StreamingMessage; import org.apache.kylin.engine.streaming.StreamingConfig; import org.apache.kylin.engine.streaming.StreamingManager; +import org.apache.kylin.metadata.model.IntermediateColumnDesc; +import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.metadata.realization.RealizationType; import org.apache.kylin.source.kafka.config.KafkaClusterConfig; import org.apache.kylin.source.kafka.config.KafkaConfig; @@ -64,6 +68,8 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; +import javax.annotation.Nullable; + @SuppressWarnings("unused") public class KafkaStreamingInput implements IStreamingInput { @@ -88,10 +94,20 @@ public class KafkaStreamingInput implements IStreamingInput { try { final KafkaConfigManager kafkaConfigManager = KafkaConfigManager.getInstance(kylinConfig); final KafkaConfig kafkaConfig = kafkaConfigManager.getKafkaConfig(streaming); - final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig, realizationType, realizationName); + List<TblColRef> columns = Lists.transform(new CubeJoinedFlatTableDesc(cube.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() { + @Nullable + @Override + public TblColRef apply(IntermediateColumnDesc input) { + return input.getColRef(); + } + }); + + final StreamingParser streamingParser = StreamingParser.getStreamingParser(kafkaConfig.getParserName(), kafkaConfig.getParserProperties(), columns); final ExecutorService executorService = Executors.newCachedThreadPool(); final List<Future<List<StreamingMessage>>> futures = Lists.newArrayList(); for (final KafkaClusterConfig kafkaClusterConfig : kafkaConfig.getKafkaClusterConfigs()) { + + final int partitionCount = KafkaRequester.getKafkaTopicMeta(kafkaClusterConfig).getPartitionIds().size(); for (int i = 0; i < partitionCount; ++i) { final StreamingMessageProducer producer = new StreamingMessageProducer(kafkaClusterConfig, i, Pair.newPair(startTime, endTime), kafkaConfig.getMargin(), streamingParser); http://git-wip-us.apache.org/repos/asf/kylin/blob/5b4cbc4b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java index 7b326e2..9075c77 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java @@ -36,54 +36,87 @@ package org.apache.kylin.source.kafka; import java.lang.reflect.Constructor; import java.util.List; +import java.util.Set; -import javax.annotation.Nullable; - -import kafka.message.MessageAndOffset; - +import com.google.common.collect.Sets; import org.apache.commons.lang3.StringUtils; -import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.cube.CubeInstance; -import org.apache.kylin.cube.CubeManager; -import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc; -import org.apache.kylin.engine.streaming.StreamingManager; +import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.StreamingMessage; -import org.apache.kylin.metadata.model.IntermediateColumnDesc; +import org.apache.kylin.common.util.TimeUtil; import org.apache.kylin.metadata.model.TblColRef; -import org.apache.kylin.metadata.realization.RealizationType; -import org.apache.kylin.source.kafka.config.KafkaConfig; - -import com.google.common.base.Function; -import com.google.common.collect.Lists; /** * By convention stream parsers should have a constructor with (List<TblColRef> allColumns, String propertiesStr) as params */ public abstract class StreamingParser { + public static final Set derivedTimeColumns = Sets.newHashSet(); + static { + derivedTimeColumns.add("minute_start"); + derivedTimeColumns.add("hour_start"); + derivedTimeColumns.add("day_start"); + derivedTimeColumns.add("week_start"); + derivedTimeColumns.add("month_start"); + derivedTimeColumns.add("quarter_start"); + derivedTimeColumns.add("year_start"); + } + + /** - * @param kafkaMessage + * @param message * @return StreamingMessage must not be NULL */ - abstract public StreamingMessage parse(MessageAndOffset kafkaMessage); + abstract public StreamingMessage parse(Object message); abstract public boolean filter(StreamingMessage streamingMessage); - public static StreamingParser getStreamingParser(KafkaConfig kafkaConfig, RealizationType realizationType, String realizationName) throws ReflectiveOperationException { - final CubeInstance cubeInstance = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(realizationName); - List<TblColRef> columns = Lists.transform(new CubeJoinedFlatTableDesc(cubeInstance.getDescriptor(), null).getColumnList(), new Function<IntermediateColumnDesc, TblColRef>() { - @Nullable - @Override - public TblColRef apply(IntermediateColumnDesc input) { - return input.getColRef(); - } - }); - if (!StringUtils.isEmpty(kafkaConfig.getParserName())) { - Class clazz = Class.forName(kafkaConfig.getParserName()); + public static StreamingParser getStreamingParser(String parserName, String parserProperties, List<TblColRef> columns) throws ReflectiveOperationException { + if (!StringUtils.isEmpty(parserName)) { + Class clazz = Class.forName(parserName); Constructor constructor = clazz.getConstructor(List.class, String.class); - return (StreamingParser) constructor.newInstance(columns, kafkaConfig.getParserProperties()); + return (StreamingParser) constructor.newInstance(columns, parserProperties); } else { - throw new IllegalStateException("invalid StreamingConfig:" + kafkaConfig.getName() + " missing property StreamingParser"); + throw new IllegalStateException("invalid StreamingConfig, parserName " + parserName + ", parserProperties " + parserProperties + "."); + } + } + + /** + * Calculate the derived time column value and put to the result list. + * @param columnName the column name, should be in lower case + * @param result the string list which representing a row + * @param t the timestamp that to calculate the derived time + * @return true if the columnName is a derived time column; otherwise false; + */ + public static final boolean populateDerivedTimeColumns(String columnName, List<String> result, long t) { + if (derivedTimeColumns.contains(columnName) == false) + return false; + + long normalized = 0; + if (columnName.equals("minute_start")) { + normalized = TimeUtil.getMinuteStart(t); + result.add(DateFormat.formatToTimeStr(normalized)); + } else if (columnName.equals("hour_start")) { + normalized = TimeUtil.getHourStart(t); + result.add(DateFormat.formatToTimeStr(normalized)); + } else if (columnName.equals("day_start")) { + //from day_start on, formatTs will output date format + normalized = TimeUtil.getDayStart(t); + result.add(DateFormat.formatToDateStr(normalized)); + } else if (columnName.equals("week_start")) { + normalized = TimeUtil.getWeekStart(t); + result.add(DateFormat.formatToDateStr(normalized)); + } else if (columnName.equals("month_start")) { + normalized = TimeUtil.getMonthStart(t); + result.add(DateFormat.formatToDateStr(normalized)); + } else if (columnName.equals("quarter_start")) { + normalized = TimeUtil.getQuarterStart(t); + result.add(DateFormat.formatToDateStr(normalized)); + } else if (columnName.equals("year_start")) { + normalized = TimeUtil.getYearStart(t); + result.add(DateFormat.formatToDateStr(normalized)); } + + return true; } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/5b4cbc4b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java index 9691ea7..5226899 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java @@ -55,7 +55,8 @@ public final class StringStreamingParser extends StreamingParser { } @Override - public StreamingMessage parse(MessageAndOffset kafkaMessage) { + public StreamingMessage parse(Object message) { + MessageAndOffset kafkaMessage = (MessageAndOffset) message; final ByteBuffer payload = kafkaMessage.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); http://git-wip-us.apache.org/repos/asf/kylin/blob/5b4cbc4b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java index e3075d5..63f5637 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java @@ -40,9 +40,7 @@ import java.util.*; import kafka.message.MessageAndOffset; import org.apache.commons.lang3.StringUtils; -import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.StreamingMessage; -import org.apache.kylin.common.util.TimeUtil; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +58,7 @@ public final class TimedJsonStreamParser extends StreamingParser { private static final Logger logger = LoggerFactory.getLogger(TimedJsonStreamParser.class); + private List<TblColRef> allColumns; private boolean formatTs = false;//not used private final ObjectMapper mapper = new ObjectMapper(); @@ -75,14 +74,14 @@ public final class TimedJsonStreamParser extends StreamingParser { String[] parts = prop.split("="); if (parts.length == 2) { switch (parts[0]) { - case "formatTs": - this.formatTs = Boolean.valueOf(parts[1]); - break; - case "tsColName": - this.tsColName = parts[1]; - break; - default: - break; + case "formatTs": + this.formatTs = Boolean.valueOf(parts[1]); + break; + case "tsColName": + this.tsColName = parts[1]; + break; + default: + break; } } } catch (Exception e) { @@ -96,14 +95,13 @@ public final class TimedJsonStreamParser extends StreamingParser { } @Override - public StreamingMessage parse(MessageAndOffset messageAndOffset) { + public StreamingMessage parse(Object msg) { + MessageAndOffset messageAndOffset = (MessageAndOffset) msg; try { Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType); Map<String, String> root = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER); root.putAll(message); String tsStr = root.get(tsColName); - //Preconditions.checkArgument(!StringUtils.isEmpty(tsStr), "Timestamp field " + tsColName + // - //" cannot be null, the message offset is " + messageAndOffset.getOffset() + " content is " + new String(messageAndOffset.getRawData())); long t; if (StringUtils.isEmpty(tsStr)) { t = 0; @@ -112,38 +110,16 @@ public final class TimedJsonStreamParser extends StreamingParser { } ArrayList<String> result = Lists.newArrayList(); - long normalized = 0; for (TblColRef column : allColumns) { - String columnName = column.getName(); - if (columnName.equalsIgnoreCase("minute_start")) { - normalized = TimeUtil.getMinuteStart(t); - result.add(DateFormat.formatToTimeStr(normalized)); - } else if (columnName.equalsIgnoreCase("hour_start")) { - normalized = TimeUtil.getHourStart(t); - result.add(DateFormat.formatToTimeStr(normalized)); - } else if (columnName.equalsIgnoreCase("day_start")) { - //from day_start on, formatTs will output date format - normalized = TimeUtil.getDayStart(t); - result.add(DateFormat.formatToDateStr(normalized)); - } else if (columnName.equalsIgnoreCase("week_start")) { - normalized = TimeUtil.getWeekStart(t); - result.add(DateFormat.formatToDateStr(normalized)); - } else if (columnName.equalsIgnoreCase("month_start")) { - normalized = TimeUtil.getMonthStart(t); - result.add(DateFormat.formatToDateStr(normalized)); - } else if (columnName.equalsIgnoreCase("quarter_start")) { - normalized = TimeUtil.getQuarterStart(t); - result.add(DateFormat.formatToDateStr(normalized)); - } else if (columnName.equalsIgnoreCase("year_start")) { - normalized = TimeUtil.getYearStart(t); - result.add(DateFormat.formatToDateStr(normalized)); - } else { - String x = root.get(columnName.toLowerCase()); + String columnName = column.getName().toLowerCase(); + + if (populateDerivedTimeColumns(columnName, result, t) == false) { + String x = root.get(columnName); result.add(x); } } - return new StreamingMessage(result, messageAndOffset.offset(), t, Collections.<String, Object> emptyMap()); + return new StreamingMessage(result, messageAndOffset.offset(), t, Collections.<String, Object>emptyMap()); } catch (IOException e) { logger.error("error", e); @@ -151,6 +127,7 @@ public final class TimedJsonStreamParser extends StreamingParser { } } + @Override public boolean filter(StreamingMessage streamingMessage) { return true;