Repository: kylin Updated Branches: refs/heads/master 2146f2b00 -> c67891d26
refactor some streaming classes Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a2b693c7 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a2b693c7 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a2b693c7 Branch: refs/heads/master Commit: a2b693c7955b4e5c436f4f815cd5588da93f7e98 Parents: 2146f2b Author: shaofengshi <shaofeng...@apache.org> Authored: Thu Jun 23 10:34:52 2016 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Wed Aug 10 10:09:22 2016 +0800 ---------------------------------------------------------------------- .../src/test/java/org/apache/kylin/job/DeployUtil.java | 2 +- .../org/apache/kylin/common/util/StreamingMessage.java | 4 ++++ .../apache/kylin/source/kafka/KafkaStreamingInput.java | 3 ++- .../org/apache/kylin/source/kafka/StreamingParser.java | 3 ++- .../kylin/source/kafka/StringStreamingParser.java | 12 ++++-------- .../kylin/source/kafka/TimedJsonStreamParser.java | 12 ++++-------- .../kylin/source/kafka/diagnose/KafkaInputAnalyzer.java | 3 ++- .../org/apache/kylin/source/kafka/util/KafkaUtils.java | 3 ++- 8 files changed, 21 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java ---------------------------------------------------------------------- diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java index da97df3..6128770 100644 --- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java +++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java @@ -166,7 +166,7 @@ public class DeployUtil { TimedJsonStreamParser timedJsonStreamParser = new TimedJsonStreamParser(tableColumns, "formatTs=true"); StringBuilder sb = new StringBuilder(); for (String json : data) { - List<String> rowColumns = timedJsonStreamParser.parse(new MessageAndOffset(new Message(json.getBytes()), 0)).getData(); + List<String> rowColumns = timedJsonStreamParser.parse((new MessageAndOffset(new Message(json.getBytes()), 0)).message().payload()).getData(); sb.append(StringUtils.join(rowColumns, ",")); sb.append(System.getProperty("line.separator")); } http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java index f327db2..53ab195 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java @@ -51,6 +51,10 @@ public class StreamingMessage { return offset; } + public void setOffset(long offset) { + this.offset = offset; + } + public final long getTimestamp() { return timestamp; } http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java index 4a3c2a9..fe3fe0a 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java @@ -205,7 +205,8 @@ public class KafkaStreamingInput implements IStreamingInput { for (MessageAndOffset messageAndOffset : fetchResponse.messageSet(topic, partitionId)) { offset++; consumeMsgCount++; - final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset); + final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload()); + streamingMessage.setOffset(messageAndOffset.offset()); if (streamingParser.filter(streamingMessage)) { final long timestamp = streamingMessage.getTimestamp(); if (timestamp >= timeRange.getFirst() && timestamp < timeRange.getSecond()) { http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java index 89b9b56..cb6a72b 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java @@ -39,6 +39,7 @@ import java.util.List; import java.util.Set; import org.apache.commons.lang3.StringUtils; +import java.nio.ByteBuffer; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.common.util.StreamingMessage; import org.apache.kylin.common.util.TimeUtil; @@ -66,7 +67,7 @@ public abstract class StreamingParser { * @param message * @return StreamingMessage must not be NULL */ - abstract public StreamingMessage parse(Object message); + abstract public StreamingMessage parse(ByteBuffer message); abstract public boolean filter(StreamingMessage streamingMessage); http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java index cb826cb..8888d67 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java @@ -43,8 +43,6 @@ import org.apache.kylin.metadata.model.TblColRef; import com.google.common.collect.Lists; -import kafka.message.MessageAndOffset; - /** */ public final class StringStreamingParser extends StreamingParser { @@ -55,12 +53,10 @@ public final class StringStreamingParser extends StreamingParser { } @Override - public StreamingMessage parse(Object message) { - MessageAndOffset kafkaMessage = (MessageAndOffset) message; - final ByteBuffer payload = kafkaMessage.message().payload(); - byte[] bytes = new byte[payload.limit()]; - payload.get(bytes); - return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), kafkaMessage.offset(), kafkaMessage.offset(), Collections.<String, Object> emptyMap()); + public StreamingMessage parse(ByteBuffer message) { + byte[] bytes = new byte[message.limit()]; + message.get(bytes); + return new StreamingMessage(Lists.newArrayList(new String(bytes).split(",")), 0, 0, Collections.<String, Object> emptyMap()); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java index 51c490e..d4308db 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java @@ -41,8 +41,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.TreeMap; - import org.apache.commons.lang3.StringUtils; +import java.nio.ByteBuffer; import org.apache.kylin.common.util.StreamingMessage; import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; @@ -54,8 +54,6 @@ import com.fasterxml.jackson.databind.type.MapType; import com.fasterxml.jackson.databind.type.SimpleType; import com.google.common.collect.Lists; -import kafka.message.MessageAndOffset; - /** * each json message with a "timestamp" field */ @@ -99,10 +97,9 @@ public final class TimedJsonStreamParser extends StreamingParser { } @Override - public StreamingMessage parse(Object msg) { - MessageAndOffset messageAndOffset = (MessageAndOffset) msg; + public StreamingMessage parse(ByteBuffer buffer) { try { - Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType); + Map<String, String> message = mapper.readValue(new ByteBufferBackedInputStream(buffer), mapType); Map<String, String> root = new TreeMap<String, String>(String.CASE_INSENSITIVE_ORDER); root.putAll(message); String tsStr = root.get(tsColName); @@ -123,8 +120,7 @@ public final class TimedJsonStreamParser extends StreamingParser { } } - return new StreamingMessage(result, messageAndOffset.offset(), t, Collections.<String, Object> emptyMap()); - + return new StreamingMessage(result, 0, t, Collections.<String, Object>emptyMap()); } catch (IOException e) { logger.error("error", e); throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java index 752ddd7..efaa042 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java @@ -109,7 +109,8 @@ public class KafkaInputAnalyzer extends AbstractApplication { offset++; consumeMsgCount++; - final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset); + final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload()); + streamingMessage.setOffset(messageAndOffset.offset()); if (streamingParser.filter(streamingMessage)) { streamQueue.add(streamingMessage); } http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java ---------------------------------------------------------------------- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java index 0d8499d..24eaa05 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java @@ -162,7 +162,8 @@ public final class KafkaUtils { final ByteBuffer payload = messageAndOffset.message().payload(); byte[] bytes = new byte[payload.limit()]; payload.get(bytes); - final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset); + final StreamingMessage streamingMessage = streamingParser.parse(messageAndOffset.message().payload()); + streamingMessage.setOffset(messageAndOffset.offset()); logger.debug(String.format("The timestamp of topic: %s, partitionId: %d, offset: %d is: %d", topic, partitionId, offset, streamingMessage.getTimestamp())); return streamingMessage.getTimestamp();