Repository: kylin
Updated Branches:
  refs/heads/master 2146f2b00 -> c67891d26


refactor some streaming classes


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/a2b693c7
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/a2b693c7
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/a2b693c7

Branch: refs/heads/master
Commit: a2b693c7955b4e5c436f4f815cd5588da93f7e98
Parents: 2146f2b
Author: shaofengshi <shaofeng...@apache.org>
Authored: Thu Jun 23 10:34:52 2016 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Wed Aug 10 10:09:22 2016 +0800

----------------------------------------------------------------------
 .../src/test/java/org/apache/kylin/job/DeployUtil.java  |  2 +-
 .../org/apache/kylin/common/util/StreamingMessage.java  |  4 ++++
 .../apache/kylin/source/kafka/KafkaStreamingInput.java  |  3 ++-
 .../org/apache/kylin/source/kafka/StreamingParser.java  |  3 ++-
 .../kylin/source/kafka/StringStreamingParser.java       | 12 ++++--------
 .../kylin/source/kafka/TimedJsonStreamParser.java       | 12 ++++--------
 .../kylin/source/kafka/diagnose/KafkaInputAnalyzer.java |  3 ++-
 .../org/apache/kylin/source/kafka/util/KafkaUtils.java  |  3 ++-
 8 files changed, 21 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
----------------------------------------------------------------------
diff --git a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java 
b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
index da97df3..6128770 100644
--- a/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
+++ b/assembly/src/test/java/org/apache/kylin/job/DeployUtil.java
@@ -166,7 +166,7 @@ public class DeployUtil {
         TimedJsonStreamParser timedJsonStreamParser = new 
TimedJsonStreamParser(tableColumns, "formatTs=true");
         StringBuilder sb = new StringBuilder();
         for (String json : data) {
-            List<String> rowColumns = timedJsonStreamParser.parse(new 
MessageAndOffset(new Message(json.getBytes()), 0)).getData();
+            List<String> rowColumns = timedJsonStreamParser.parse((new 
MessageAndOffset(new Message(json.getBytes()), 
0)).message().payload()).getData();
             sb.append(StringUtils.join(rowColumns, ","));
             sb.append(System.getProperty("line.separator"));
         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java 
b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
index f327db2..53ab195 100644
--- 
a/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
+++ 
b/core-common/src/main/java/org/apache/kylin/common/util/StreamingMessage.java
@@ -51,6 +51,10 @@ public class StreamingMessage {
         return offset;
     }
 
+    public void setOffset(long offset) {
+        this.offset = offset;
+    }
+
     public final long getTimestamp() {
         return timestamp;
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index 4a3c2a9..fe3fe0a 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -205,7 +205,8 @@ public class KafkaStreamingInput implements IStreamingInput 
{
                     for (MessageAndOffset messageAndOffset : 
fetchResponse.messageSet(topic, partitionId)) {
                         offset++;
                         consumeMsgCount++;
-                        final StreamingMessage streamingMessage = 
streamingParser.parse(messageAndOffset);
+                        final StreamingMessage streamingMessage = 
streamingParser.parse(messageAndOffset.message().payload());
+                        streamingMessage.setOffset(messageAndOffset.offset());
                         if (streamingParser.filter(streamingMessage)) {
                             final long timestamp = 
streamingMessage.getTimestamp();
                             if (timestamp >= timeRange.getFirst() && timestamp 
< timeRange.getSecond()) {

http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
index 89b9b56..cb6a72b 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StreamingParser.java
@@ -39,6 +39,7 @@ import java.util.List;
 import java.util.Set;
 
 import org.apache.commons.lang3.StringUtils;
+import java.nio.ByteBuffer;
 import org.apache.kylin.common.util.DateFormat;
 import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.common.util.TimeUtil;
@@ -66,7 +67,7 @@ public abstract class StreamingParser {
      * @param message
      * @return StreamingMessage must not be NULL
      */
-    abstract public StreamingMessage parse(Object message);
+    abstract public StreamingMessage parse(ByteBuffer message);
 
     abstract public boolean filter(StreamingMessage streamingMessage);
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
index cb826cb..8888d67 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/StringStreamingParser.java
@@ -43,8 +43,6 @@ import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.collect.Lists;
 
-import kafka.message.MessageAndOffset;
-
 /**
  */
 public final class StringStreamingParser extends StreamingParser {
@@ -55,12 +53,10 @@ public final class StringStreamingParser extends 
StreamingParser {
     }
 
     @Override
-    public StreamingMessage parse(Object message) {
-        MessageAndOffset kafkaMessage = (MessageAndOffset) message;
-        final ByteBuffer payload = kafkaMessage.message().payload();
-        byte[] bytes = new byte[payload.limit()];
-        payload.get(bytes);
-        return new StreamingMessage(Lists.newArrayList(new 
String(bytes).split(",")), kafkaMessage.offset(), kafkaMessage.offset(), 
Collections.<String, Object> emptyMap());
+    public StreamingMessage parse(ByteBuffer message) {
+        byte[] bytes = new byte[message.limit()];
+        message.get(bytes);
+        return new StreamingMessage(Lists.newArrayList(new 
String(bytes).split(",")), 0, 0, Collections.<String, Object> emptyMap());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
index 51c490e..d4308db 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -41,8 +41,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
-
 import org.apache.commons.lang3.StringUtils;
+import java.nio.ByteBuffer;
 import org.apache.kylin.common.util.StreamingMessage;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
@@ -54,8 +54,6 @@ import com.fasterxml.jackson.databind.type.MapType;
 import com.fasterxml.jackson.databind.type.SimpleType;
 import com.google.common.collect.Lists;
 
-import kafka.message.MessageAndOffset;
-
 /**
  * each json message with a "timestamp" field
  */
@@ -99,10 +97,9 @@ public final class TimedJsonStreamParser extends 
StreamingParser {
     }
 
     @Override
-    public StreamingMessage parse(Object msg) {
-        MessageAndOffset messageAndOffset = (MessageAndOffset) msg;
+    public StreamingMessage parse(ByteBuffer buffer) {
         try {
-            Map<String, String> message = mapper.readValue(new 
ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+            Map<String, String> message = mapper.readValue(new 
ByteBufferBackedInputStream(buffer), mapType);
             Map<String, String> root = new TreeMap<String, 
String>(String.CASE_INSENSITIVE_ORDER);
             root.putAll(message);
             String tsStr = root.get(tsColName);
@@ -123,8 +120,7 @@ public final class TimedJsonStreamParser extends 
StreamingParser {
                 }
             }
 
-            return new StreamingMessage(result, messageAndOffset.offset(), t, 
Collections.<String, Object> emptyMap());
-
+            return new StreamingMessage(result, 0, t, Collections.<String, 
Object>emptyMap());
         } catch (IOException e) {
             logger.error("error", e);
             throw new RuntimeException(e);

http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
index 752ddd7..efaa042 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/diagnose/KafkaInputAnalyzer.java
@@ -109,7 +109,8 @@ public class KafkaInputAnalyzer extends AbstractApplication 
{
                         offset++;
                         consumeMsgCount++;
 
-                        final StreamingMessage streamingMessage = 
streamingParser.parse(messageAndOffset);
+                        final StreamingMessage streamingMessage = 
streamingParser.parse(messageAndOffset.message().payload());
+                        streamingMessage.setOffset(messageAndOffset.offset());
                         if (streamingParser.filter(streamingMessage)) {
                             streamQueue.add(streamingMessage);
                         }

http://git-wip-us.apache.org/repos/asf/kylin/blob/a2b693c7/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
index 0d8499d..24eaa05 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaUtils.java
@@ -162,7 +162,8 @@ public final class KafkaUtils {
         final ByteBuffer payload = messageAndOffset.message().payload();
         byte[] bytes = new byte[payload.limit()];
         payload.get(bytes);
-        final StreamingMessage streamingMessage = 
streamingParser.parse(messageAndOffset);
+        final StreamingMessage streamingMessage = 
streamingParser.parse(messageAndOffset.message().payload());
+        streamingMessage.setOffset(messageAndOffset.offset());
         logger.debug(String.format("The timestamp of topic: %s, partitionId: 
%d, offset: %d is: %d", topic, partitionId, offset, 
streamingMessage.getTimestamp()));
         return streamingMessage.getTimestamp();
 

Reply via email to