KYLIN-1011

Project: http://git-wip-us.apache.org/repos/asf/incubator-kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-kylin/commit/55a85df0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-kylin/tree/55a85df0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-kylin/diff/55a85df0

Branch: refs/heads/KYLIN-1011
Commit: 55a85df0cfbabbe0a176d05ae8830ea4c6ec9fa6
Parents: cc15d63
Author: qianhao.zhou <[email protected]>
Authored: Wed Sep 9 19:13:01 2015 +0800
Committer: qianhao.zhou <[email protected]>
Committed: Tue Sep 22 17:29:32 2015 +0800

----------------------------------------------------------------------
 .../kylin/job/streaming/KafkaDataLoader.java    |  54 +++++++
 build/bin/kylin.sh                              |   4 +-
 build/script/compress.sh                        |   1 +
 .../kylin/engine/streaming/StreamingCLI.java    |  99 ------------
 .../kylin/engine/streaming/cli/MonitorCLI.java  |  70 +++++++++
 .../engine/streaming/cli/StreamingCLI.java      | 120 +++++++++++++++
 .../streaming/monitor/StreamingMonitor.java     | 154 +++++++++++++++++++
 .../engine/streaming/util/StreamingUtils.java   |   2 +-
 .../kafka/ByteBufferBackedInputStream.java      |  53 +++++++
 .../kylin/source/kafka/KafkaStreamingInput.java |   6 +-
 .../source/kafka/TimedJsonStreamParser.java     | 142 +++++++++++++++++
 .../kylin/source/kafka/config/KafkaConfig.java  |  12 ++
 .../apache/kylin/job/monitor/MonitorCLI.java    |  69 ---------
 .../kylin/job/monitor/StreamingMonitor.java     | 154 -------------------
 .../kylin/job/streaming/KafkaDataLoader.java    |  54 -------
 .../kylin/job/streaming/StreamingBootstrap.java |   2 +-
 16 files changed, 614 insertions(+), 382 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git 
a/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java 
b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
new file mode 100644
index 0000000..95fbc9d
--- /dev/null
+++ b/assembly/src/test/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
@@ -0,0 +1,54 @@
+package org.apache.kylin.job.streaming;
+
+import java.util.List;
+import java.util.Properties;
+
+import javax.annotation.Nullable;
+
+import kafka.javaapi.producer.Producer;
+import kafka.producer.KeyedMessage;
+import kafka.producer.ProducerConfig;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.streaming.BrokerConfig;
+import org.apache.kylin.streaming.KafkaClusterConfig;
+import org.apache.kylin.streaming.StreamingConfig;
+
+import com.google.common.base.Function;
+import com.google.common.collect.Collections2;
+import com.google.common.collect.Lists;
+
+/**
+ * Load prepared data into kafka(for test use)
+ */
+public class KafkaDataLoader {
+
+    public static void loadIntoKafka(StreamingConfig streamingConfig, 
List<String> messages) {
+
+        KafkaClusterConfig clusterConfig = 
streamingConfig.getKafkaClusterConfigs().get(0);
+        String brokerList = 
StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new 
Function<BrokerConfig, String>() {
+            @Nullable
+            @Override
+            public String apply(BrokerConfig brokerConfig) {
+                return brokerConfig.getHost() + ":" + brokerConfig.getPort();
+            }
+        }), ",");
+        Properties props = new Properties();
+        props.put("metadata.broker.list", brokerList);
+        props.put("serializer.class", "kafka.serializer.StringEncoder");
+        props.put("request.required.acks", "1");
+
+        ProducerConfig config = new ProducerConfig(props);
+
+        Producer<String, String> producer = new Producer<String, 
String>(config);
+
+        List<KeyedMessage<String, String>> keyedMessages = 
Lists.newArrayList();
+        for (int i = 0; i < messages.size(); ++i) {
+            KeyedMessage<String, String> keyedMessage = new 
KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), 
messages.get(i));
+            keyedMessages.add(keyedMessage);
+        }
+        producer.send(keyedMessages);
+        producer.close();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/build/bin/kylin.sh
----------------------------------------------------------------------
diff --git a/build/bin/kylin.sh b/build/bin/kylin.sh
index b27864c..b581e09 100644
--- a/build/bin/kylin.sh
+++ b/build/bin/kylin.sh
@@ -115,7 +115,7 @@ then
         -Dkylin.hive.dependency=${hive_dependency} \
         -Dkylin.hbase.dependency=${hbase_dependency} \
         -Dspring.profiles.active=${spring_profile} \
-        org.apache.kylin.job.streaming.StreamingCLI $@ > 
${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > 
${KYLIN_HOME}/logs/$3_$4 &
+        org.apache.kylin.engine.streaming.cli.StreamingCLI $@ > 
${KYLIN_HOME}/logs/streaming_$3_$4.log 2>&1 & echo $! > 
${KYLIN_HOME}/logs/$3_$4 &
         echo "streaming started name: $3 id: $4"
         exit 0
     elif [ $2 == "stop" ]
@@ -170,7 +170,7 @@ then
     -Dkylin.hive.dependency=${hive_dependency} \
     -Dkylin.hbase.dependency=${hbase_dependency} \
     -Dspring.profiles.active=${spring_profile} \
-    org.apache.kylin.job.monitor.MonitorCLI $@ >> 
${KYLIN_HOME}/logs/monitor.log 2>&1
+    org.apache.kylin.engine.streaming.cli.MonitorCLI $@ > 
${KYLIN_HOME}/logs/monitor.log 2>&1
     exit 0
 else
     echo "usage: kylin.sh start or kylin.sh stop"

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/build/script/compress.sh
----------------------------------------------------------------------
diff --git a/build/script/compress.sh b/build/script/compress.sh
index a424b98..c70e567 100755
--- a/build/script/compress.sh
+++ b/build/script/compress.sh
@@ -21,6 +21,7 @@ rm -rf lib tomcat commit_SHA1
 find kylin-${version} -type d -exec chmod 755 {} \;
 find kylin-${version} -type f -exec chmod 644 {} \;
 find kylin-${version} -type f -name "*.sh" -exec chmod 755 {} \;
+mkdir -p ../dist
 tar -cvzf ../dist/kylin-${version}.tar.gz kylin-${version}
 rm -rf kylin-${version}
 

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
----------------------------------------------------------------------
diff --git 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
deleted file mode 100644
index 8bf52c1..0000000
--- 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/StreamingCLI.java
+++ /dev/null
@@ -1,99 +0,0 @@
-/*
- *
- *
- *  Licensed to the Apache Software Foundation (ASF) under one or more
- *
- *  contributor license agreements. See the NOTICE file distributed with
- *
- *  this work for additional information regarding copyright ownership.
- *
- *  The ASF licenses this file to You under the Apache License, Version 2.0
- *
- *  (the "License"); you may not use this file except in compliance with
- *
- *  the License. You may obtain a copy of the License at
- *
- *
- *
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- *
- *
- *  Unless required by applicable law or agreed to in writing, software
- *
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *
- *  See the License for the specific language governing permissions and
- *
- *  limitations under the License.
- *
- * /
- */
-
-package org.apache.kylin.engine.streaming;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.cache.RemoteCacheUpdater;
-import org.apache.kylin.common.restclient.AbstractRestCache;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-public class StreamingCLI {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(StreamingCLI.class);
-
-    public static void main(String[] args) {
-        try {
-            AbstractRestCache.setCacheUpdater(new RemoteCacheUpdater());
-
-            Preconditions.checkArgument(args[0].equals("streaming"));
-            Preconditions.checkArgument(args[1].equals("start"));
-
-            int i = 2;
-            BootstrapConfig bootstrapConfig = new BootstrapConfig();
-            while (i < args.length) {
-                String argName = args[i];
-                switch (argName) {
-                case "-oneoff":
-                    bootstrapConfig.setOneOff(Boolean.parseBoolean(args[++i]));
-                    break;
-                case "-start":
-                    bootstrapConfig.setStart(Long.parseLong(args[++i]));
-                    break;
-                case "-end":
-                    bootstrapConfig.setEnd(Long.parseLong(args[++i]));
-                    break;
-                case "-streaming":
-                    bootstrapConfig.setStreaming(args[++i]);
-                    break;
-                case "-partition":
-                    
bootstrapConfig.setPartitionId(Integer.parseInt(args[++i]));
-                    break;
-                case "-fillGap":
-                    
bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i]));
-                    break;
-                default:
-                    logger.warn("ignore this arg:" + argName);
-                }
-                i++;
-            }
-            final Runnable runnable = new 
OneOffStreamingBuilder(bootstrapConfig.getStreaming(), 
bootstrapConfig.getStart(), bootstrapConfig.getEnd()).build();
-            runnable.run();
-            logger.info("streaming process stop, exit with 0");
-            System.exit(0);
-        } catch (Exception e) {
-            printArgsError(args);
-            logger.error("error start streaming", e);
-            System.exit(-1);
-        }
-    }
-
-    private static void printArgsError(String[] args) {
-        logger.warn("invalid args:" + StringUtils.join(args, " "));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
----------------------------------------------------------------------
diff --git 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
new file mode 100644
index 0000000..d7dc6b3
--- /dev/null
+++ 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/MonitorCLI.java
@@ -0,0 +1,70 @@
+package org.apache.kylin.engine.streaming.cli;
+
+import java.util.List;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class MonitorCLI {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(MonitorCLI.class);
+
+    public static void main(String[] args) {
+        Preconditions.checkArgument(args[0].equals("monitor"));
+
+        int i = 1;
+        List<String> receivers = null;
+        String host = null;
+        String tableName = null;
+        String authorization = null;
+        String cubeName = null;
+        String projectName = "default";
+        while (i < args.length) {
+            String argName = args[i];
+            switch (argName) {
+            case "-receivers":
+                receivers = Lists.newArrayList(StringUtils.split(args[++i], 
";"));
+                break;
+            case "-host":
+                host = args[++i];
+                break;
+            case "-tableName":
+                tableName = args[++i];
+                break;
+            case "-authorization":
+                authorization = args[++i];
+                break;
+            case "-cubeName":
+                cubeName = args[++i];
+                break;
+            case "-projectName":
+                projectName = args[++i];
+                break;
+            default:
+                throw new RuntimeException("invalid argName:" + argName);
+            }
+            i++;
+        }
+        Preconditions.checkArgument(receivers != null && receivers.size() > 0);
+        final StreamingMonitor streamingMonitor = new StreamingMonitor();
+        if (tableName != null) {
+            logger.info(String.format("check query tableName:%s host:%s 
receivers:%s", tableName, host, StringUtils.join(receivers, ";")));
+            Preconditions.checkNotNull(host);
+            Preconditions.checkNotNull(authorization);
+            Preconditions.checkNotNull(tableName);
+            streamingMonitor.checkCountAll(receivers, host, authorization, 
projectName, tableName);
+        }
+        if (cubeName != null) {
+            logger.info(String.format("check cube cubeName:%s receivers:%s", 
cubeName, StringUtils.join(receivers, ";")));
+            streamingMonitor.checkCube(receivers, cubeName,host);
+        }
+        System.exit(0);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
----------------------------------------------------------------------
diff --git 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
new file mode 100644
index 0000000..a4ccabc
--- /dev/null
+++ 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/cli/StreamingCLI.java
@@ -0,0 +1,120 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.engine.streaming.cli;
+
+import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.cache.RemoteCacheUpdater;
+import org.apache.kylin.common.restclient.AbstractRestCache;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.engine.streaming.BootstrapConfig;
+import org.apache.kylin.engine.streaming.OneOffStreamingBuilder;
+import org.apache.kylin.engine.streaming.StreamingConfig;
+import org.apache.kylin.engine.streaming.StreamingManager;
+import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class StreamingCLI {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(StreamingCLI.class);
+
+    public static void main(String[] args) {
+        try {
+            AbstractRestCache.setCacheUpdater(new RemoteCacheUpdater());
+
+            Preconditions.checkArgument(args[0].equals("streaming"));
+            Preconditions.checkArgument(args[1].equals("start"));
+
+            int i = 2;
+            BootstrapConfig bootstrapConfig = new BootstrapConfig();
+            while (i < args.length) {
+                String argName = args[i];
+                switch (argName) {
+                case "-oneoff":
+                    Boolean.parseBoolean(args[++i]);
+                    break;
+                case "-start":
+                    bootstrapConfig.setStart(Long.parseLong(args[++i]));
+                    break;
+                case "-end":
+                    bootstrapConfig.setEnd(Long.parseLong(args[++i]));
+                    break;
+                case "-streaming":
+                    bootstrapConfig.setStreaming(args[++i]);
+                    break;
+                case "-partition":
+                    
bootstrapConfig.setPartitionId(Integer.parseInt(args[++i]));
+                    break;
+                case "-fillGap":
+                    
bootstrapConfig.setFillGap(Boolean.parseBoolean(args[++i]));
+                    break;
+                default:
+                    logger.warn("ignore this arg:" + argName);
+                }
+                i++;
+            }
+            if (bootstrapConfig.isFillGap()) {
+                final StreamingConfig streamingConfig = 
StreamingManager.getInstance(KylinConfig.getInstanceFromEnv()).getStreamingConfig(bootstrapConfig.getStreaming());
+                final List<Pair<Long, Long>> gaps = 
StreamingMonitor.findGaps(streamingConfig.getCubeName());
+                logger.info("all gaps:" + StringUtils.join(gaps, ","));
+                for (Pair<Long, Long> gap : gaps) {
+                    startOneOffCubeStreaming(bootstrapConfig.getStreaming(), 
gap.getFirst(), gap.getSecond());
+                }
+            } else {
+                startOneOffCubeStreaming(bootstrapConfig.getStreaming(), 
bootstrapConfig.getStart(), bootstrapConfig.getEnd());
+                logger.info("streaming process finished, exit with 0");
+                System.exit(0);
+            }
+        } catch (Exception e) {
+            printArgsError(args);
+            logger.error("error start streaming", e);
+            System.exit(-1);
+        }
+    }
+    
+    private static void startOneOffCubeStreaming(String streaming, long start, 
long end) {
+        final Runnable runnable = new OneOffStreamingBuilder(streaming, start, 
end).build();
+        runnable.run();
+    }
+
+    private static void printArgsError(String[] args) {
+        logger.warn("invalid args:" + StringUtils.join(args, " "));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
----------------------------------------------------------------------
diff --git 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
new file mode 100644
index 0000000..a6b8a9f
--- /dev/null
+++ 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/monitor/StreamingMonitor.java
@@ -0,0 +1,154 @@
+package org.apache.kylin.engine.streaming.monitor;
+
+import java.io.PrintWriter;
+import java.io.StringWriter;
+import java.util.Collections;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+
+import javax.annotation.Nullable;
+
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
+import org.apache.commons.httpclient.methods.PostMethod;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.JsonUtil;
+import org.apache.kylin.common.util.MailService;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.metadata.model.SegmentStatusEnum;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Function;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+
+/**
+ */
+public class StreamingMonitor {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(StreamingMonitor.class);
+
+    public void checkCountAll(List<String> receivers, String host, String 
authorization, String projectName, String tableName) {
+        String title = "checkCountAll job(host:" + host + " tableName:" + 
tableName + ") ";
+        StringBuilder stringBuilder = new StringBuilder();
+        String url = host + "/kylin/api/query";
+        PostMethod request = new PostMethod(url);
+        try {
+
+            request.addRequestHeader("Authorization", "Basic " + 
authorization);
+            request.addRequestHeader("Content-Type", "application/json");
+            String query = String.format("{\"sql\":\"select count(*) from 
%s\",\"offset\":0,\"limit\":50000,\"acceptPartial\":true,\"project\":\"%s\"}", 
tableName, projectName);
+            request.setRequestEntity(new 
ByteArrayRequestEntity(query.getBytes()));
+
+            int statusCode = new HttpClient().executeMethod(request);
+            String msg = Bytes.toString(request.getResponseBody());
+            stringBuilder.append("host:").append(host).append("\n");
+            stringBuilder.append("query:").append(query).append("\n");
+            
stringBuilder.append("statusCode:").append(statusCode).append("\n");
+            if (statusCode == 200) {
+                title += "succeed";
+                final HashMap hashMap = JsonUtil.readValue(msg, HashMap.class);
+                
stringBuilder.append("results:").append(hashMap.get("results").toString()).append("\n");
+                
stringBuilder.append("duration:").append(hashMap.get("duration").toString()).append("\n");
+            } else {
+                title += "failed";
+                stringBuilder.append("response:").append(msg).append("\n");
+            }
+        } catch (Exception e) {
+            final StringWriter out = new StringWriter();
+            e.printStackTrace(new PrintWriter(out));
+            title += "failed";
+            stringBuilder.append(out.toString());
+        } finally {
+            request.releaseConnection();
+        }
+        logger.info("title:" + title);
+        logger.info("content:" + stringBuilder.toString());
+        sendMail(receivers, title, stringBuilder.toString());
+    }
+
+    public static final List<Pair<Long, Long>> findGaps(String cubeName) {
+        List<CubeSegment> segments = getSortedReadySegments(cubeName);
+        List<Pair<Long, Long>> gaps = Lists.newArrayList();
+        for (int i = 0; i < segments.size() - 1; ++i) {
+            CubeSegment first = segments.get(i);
+            CubeSegment second = segments.get(i + 1);
+            if (first.getDateRangeEnd() == second.getDateRangeStart()) {
+                continue;
+            } else if (first.getDateRangeEnd() < second.getDateRangeStart()) {
+                gaps.add(Pair.newPair(first.getDateRangeEnd(), 
second.getDateRangeStart()));
+            }
+        }
+        return gaps;
+    }
+
+    private static List<CubeSegment> getSortedReadySegments(String cubeName) {
+        final CubeInstance cube = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+        Preconditions.checkNotNull(cube);
+        final List<CubeSegment> segments = 
cube.getSegment(SegmentStatusEnum.READY);
+        logger.info("totally " + segments.size() + " cubeSegments");
+        Collections.sort(segments);
+        return segments;
+    }
+
+    public static final List<Pair<String, String>> findOverlaps(String 
cubeName) {
+        List<CubeSegment> segments = getSortedReadySegments(cubeName);
+        List<Pair<String, String>> overlaps = Lists.newArrayList();
+        for (int i = 0; i < segments.size() - 1; ++i) {
+            CubeSegment first = segments.get(i);
+            CubeSegment second = segments.get(i + 1);
+            if (first.getDateRangeEnd() == second.getDateRangeStart()) {
+                continue;
+            } else {
+                overlaps.add(Pair.newPair(first.getName(), second.getName()));
+            }
+        }
+        return overlaps;
+    }
+
+    public void checkCube(List<String> receivers, String cubeName, String 
host) {
+        final CubeInstance cube = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
+        if (cube == null) {
+            logger.info("cube:" + cubeName + " does not exist");
+            return;
+        }
+        List<Pair<Long, Long>> gaps = findGaps(cubeName);
+        List<Pair<String, String>> overlaps = Lists.newArrayList();
+        StringBuilder content = new StringBuilder();
+        if (!gaps.isEmpty()) {
+            content.append("all 
gaps:").append("\n").append(StringUtils.join(Lists.transform(gaps, new 
Function<Pair<Long, Long>, String>() {
+                @Nullable
+                @Override
+                public String apply(Pair<Long, Long> input) {
+                    return parseInterval(input);
+                }
+            }), "\n")).append("\n");
+        }
+        if (!overlaps.isEmpty()) {
+            content.append("all 
overlaps:").append("\n").append(StringUtils.join(overlaps, "\n")).append("\n");
+        }
+        if (content.length() > 0) {
+            logger.info(content.toString());
+            sendMail(receivers, String.format("%s has gaps or overlaps on host 
%s", cubeName, host), content.toString());
+        } else {
+            logger.info("no gaps or overlaps");
+        }
+    }
+
+    private String parseInterval(Pair<Long, Long> interval) {
+        return String.format("{%d(%s), %d(%s)}", interval.getFirst(), new 
Date(interval.getFirst()).toString(), interval.getSecond(), new 
Date(interval.getSecond()).toString());
+    }
+
+    private void sendMail(List<String> receivers, String title, String 
content) {
+        final MailService mailService = new 
MailService(KylinConfig.getInstanceFromEnv());
+        mailService.sendMail(receivers, title, content, false);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
----------------------------------------------------------------------
diff --git 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
index 718fc43..47db924 100644
--- 
a/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
+++ 
b/engine-streaming/src/main/java/org/apache/kylin/engine/streaming/util/StreamingUtils.java
@@ -53,7 +53,7 @@ public class StreamingUtils {
     }
 
     public static IStreamingOutput getStreamingOutput(String streaming) {
-        return (IStreamingOutput) 
ClassUtil.newInstance("org.apache.kylin.storage.hbase.HBaseStreamingOutput");
+        return (IStreamingOutput) 
ClassUtil.newInstance("org.apache.kylin.storage.hbase.steps.HBaseStreamingOutput");
     }
 
     public static StreamingBatchBuilder getMicroBatchBuilder(String streaming) 
{

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
new file mode 100644
index 0000000..5883493
--- /dev/null
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/ByteBufferBackedInputStream.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.kafka;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ */
+class ByteBufferBackedInputStream extends InputStream {
+
+    private ByteBuffer buf;
+
+    public ByteBufferBackedInputStream(ByteBuffer buf) {
+        this.buf = buf;
+    }
+
+    @Override
+    public int read() throws IOException {
+        if (!buf.hasRemaining()) {
+            return -1;
+        }
+        return buf.get() & 0xFF;
+    }
+
+    @Override
+    public int read(byte[] bytes, int off, int len)
+            throws IOException {
+        if (!buf.hasRemaining()) {
+            return -1;
+        }
+
+        len = Math.min(len, buf.remaining());
+        buf.get(bytes, off, len);
+        return len;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
index 393b8e7..09dee50 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaStreamingInput.java
@@ -79,6 +79,7 @@ public class KafkaStreamingInput implements IStreamingInput {
     @Override
     public StreamingBatch getBatchWithTimeWindow(String streaming, int id, 
long startTime, long endTime) {
         try {
+            logger.info(String.format("prepare to get streaming batch, 
name:%s, id:%d, startTime:%d, endTime:%d", streaming, id, startTime, endTime));
             final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv();
             final KafkaConfigManager kafkaConfigManager = 
KafkaConfigManager.getInstance(kylinConfig);
             final KafkaConfig kafkaConfig = 
kafkaConfigManager.getStreamingConfig(streaming);
@@ -106,6 +107,7 @@ public class KafkaStreamingInput implements IStreamingInput 
{
                 }
             }
             final Pair<Long, Long> timeRange = Pair.newPair(startTime, 
endTime);
+            logger.info("finish to get streaming batch, total message count:" 
+ messages.size());
             return new StreamingBatch(messages, timeRange);
         } catch (ReflectiveOperationException e) {
             throw new RuntimeException("failed to create instance of 
StreamingParser", e);
@@ -220,8 +222,8 @@ public class KafkaStreamingInput implements IStreamingInput 
{
         });
         if (!StringUtils.isEmpty(kafkaConfig.getParserName())) {
             Class clazz = Class.forName(kafkaConfig.getParserName());
-            Constructor constructor = clazz.getConstructor(List.class);
-            return (StreamingParser) constructor.newInstance(columns);
+            Constructor constructor = clazz.getConstructor(List.class, 
String.class);
+            return (StreamingParser) constructor.newInstance(columns, 
kafkaConfig.getParserProperties());
         } else {
             throw new IllegalStateException("invalid StreamingConfig:" + 
kafkaConfig.getName() + " missing property StreamingParser");
         }

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
new file mode 100644
index 0000000..9b5071b
--- /dev/null
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/TimedJsonStreamParser.java
@@ -0,0 +1,142 @@
+/*
+ *
+ *
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *
+ *  contributor license agreements. See the NOTICE file distributed with
+ *
+ *  this work for additional information regarding copyright ownership.
+ *
+ *  The ASF licenses this file to You under the Apache License, Version 2.0
+ *
+ *  (the "License"); you may not use this file except in compliance with
+ *
+ *  the License. You may obtain a copy of the License at
+ *
+ *
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ *  See the License for the specific language governing permissions and
+ *
+ *  limitations under the License.
+ *
+ * /
+ */
+
+package org.apache.kylin.source.kafka;
+
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.MapType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import com.google.common.collect.Lists;
+import kafka.message.MessageAndOffset;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.kylin.common.util.DateFormat;
+import org.apache.kylin.common.util.TimeUtil;
+import org.apache.kylin.engine.streaming.StreamingMessage;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.*;
+
+/**
+ * each json message with a "timestamp" field
+ */
+public final class TimedJsonStreamParser implements StreamingParser {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(TimedJsonStreamParser.class);
+
+    private List<TblColRef> allColumns;
+    private boolean formatTs = false;
+    private final ObjectMapper mapper = new ObjectMapper();
+    private String tsColName = "timestamp";
+    private final JavaType mapType = MapType.construct(HashMap.class, 
SimpleType.construct(String.class), SimpleType.construct(String.class));
+
+    public TimedJsonStreamParser(List<TblColRef> allColumns, String 
propertiesStr) {
+        this.allColumns = allColumns;
+        if (!StringUtils.isEmpty(propertiesStr)) {
+            String[] properties = propertiesStr.split(";");
+            for (String prop : properties) {
+                try {
+                    String[] parts = prop.split("=");
+                    if (parts.length == 2) {
+                        switch (parts[0]) {
+                        case "formatTs":
+                            this.formatTs = Boolean.valueOf(parts[1]);
+                            break;
+                        case "tsColName":
+                            this.tsColName = parts[1];
+                            break;
+                        default:
+                            break;
+                        }
+                    }
+                } catch (Exception e) {
+                    logger.error("Failed to parse property " + prop);
+                    //ignore
+                }
+            }
+        }
+
+        logger.info("TimedJsonStreamParser with formatTs {} tsColName {}", 
formatTs, tsColName);
+    }
+
+    @Override
+    public StreamingMessage parse(MessageAndOffset messageAndOffset) {
+        try {
+            Map<String, String> root = mapper.readValue(new 
ByteBufferBackedInputStream(messageAndOffset.message().payload()), mapType);
+            String tsStr = root.get(tsColName);
+            //Preconditions.checkArgument(!StringUtils.isEmpty(tsStr), 
"Timestamp field " + tsColName + //
+            //" cannot be null, the message offset is " + 
messageAndOffset.getOffset() + " content is " + new 
String(messageAndOffset.getRawData()));
+            long t;
+            if (StringUtils.isEmpty(tsStr)) {
+                t = 0;
+            } else {
+                t = Long.valueOf(tsStr);
+            }
+            ArrayList<String> result = Lists.newArrayList();
+
+            for (TblColRef column : allColumns) {
+                String columnName = column.getName();
+                if (columnName.equalsIgnoreCase("minute_start")) {
+                    long minuteStart = TimeUtil.getMinuteStart(t);
+                    result.add(formatTs ? 
DateFormat.formatToTimeStr(minuteStart) : String.valueOf(minuteStart));
+                } else if (columnName.equalsIgnoreCase("hour_start")) {
+                    long hourStart = TimeUtil.getHourStart(t);
+                    result.add(formatTs ? 
DateFormat.formatToTimeStr(hourStart) : String.valueOf(hourStart));
+                } else if (columnName.equalsIgnoreCase("day_start")) {
+                    //of day start we'll add yyyy-mm-dd
+                    long ts = TimeUtil.getDayStart(t);
+                    result.add(DateFormat.formatToDateStr(ts));
+                } else {
+                    String x = root.get(columnName.toLowerCase());
+                    result.add(x);
+                }
+            }
+
+            return new StreamingMessage(result, messageAndOffset.offset(), t, 
Collections.<String, Object>emptyMap());
+
+        } catch (IOException e) {
+            logger.error("error", e);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public boolean filter(StreamingMessage streamingMessage) {
+        return true;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
----------------------------------------------------------------------
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
index b56231a..1aff0ce 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/config/KafkaConfig.java
@@ -81,6 +81,10 @@ public class KafkaConfig extends RootPersistentEntity {
     @JsonProperty("margin")
     private long margin;
 
+    //"configA=1;configB=2"
+    @JsonProperty("parserProperties")
+    private String parserProperties;
+
     public List<KafkaClusterConfig> getKafkaClusterConfigs() {
         return kafkaClusterConfigs;
     }
@@ -141,6 +145,14 @@ public class KafkaConfig extends RootPersistentEntity {
         this.margin = margin;
     }
 
+    public String getParserProperties() {
+        return parserProperties;
+    }
+
+    public void setParserProperties(String parserProperties) {
+        this.parserProperties = parserProperties;
+    }
+
     @Override
     public KafkaConfig clone() {
         try {

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java 
b/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
deleted file mode 100644
index 7b9831a..0000000
--- a/streaming/src/main/java/org/apache/kylin/job/monitor/MonitorCLI.java
+++ /dev/null
@@ -1,69 +0,0 @@
-package org.apache.kylin.job.monitor;
-
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class MonitorCLI {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(MonitorCLI.class);
-
-    public static void main(String[] args) {
-        Preconditions.checkArgument(args[0].equals("monitor"));
-
-        int i = 1;
-        List<String> receivers = null;
-        String host = null;
-        String tableName = null;
-        String authorization = null;
-        String cubeName = null;
-        String projectName = "default";
-        while (i < args.length) {
-            String argName = args[i];
-            switch (argName) {
-            case "-receivers":
-                receivers = Lists.newArrayList(StringUtils.split(args[++i], 
";"));
-                break;
-            case "-host":
-                host = args[++i];
-                break;
-            case "-tableName":
-                tableName = args[++i];
-                break;
-            case "-authorization":
-                authorization = args[++i];
-                break;
-            case "-cubeName":
-                cubeName = args[++i];
-                break;
-            case "-projectName":
-                projectName = args[++i];
-                break;
-            default:
-                throw new RuntimeException("invalid argName:" + argName);
-            }
-            i++;
-        }
-        Preconditions.checkArgument(receivers != null && receivers.size() > 0);
-        final StreamingMonitor streamingMonitor = new StreamingMonitor();
-        if (tableName != null) {
-            logger.info(String.format("check query tableName:%s host:%s 
receivers:%s", tableName, host, StringUtils.join(receivers, ";")));
-            Preconditions.checkNotNull(host);
-            Preconditions.checkNotNull(authorization);
-            Preconditions.checkNotNull(tableName);
-            streamingMonitor.checkCountAll(receivers, host, authorization, 
projectName, tableName);
-        }
-        if (cubeName != null) {
-            logger.info(String.format("check cube cubeName:%s receivers:%s", 
cubeName, StringUtils.join(receivers, ";")));
-            streamingMonitor.checkCube(receivers, cubeName,host);
-        }
-        System.exit(0);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java 
b/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
deleted file mode 100644
index e23f065..0000000
--- a/streaming/src/main/java/org/apache/kylin/job/monitor/StreamingMonitor.java
+++ /dev/null
@@ -1,154 +0,0 @@
-package org.apache.kylin.job.monitor;
-
-import java.io.PrintWriter;
-import java.io.StringWriter;
-import java.util.Collections;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.List;
-
-import javax.annotation.Nullable;
-
-import org.apache.commons.httpclient.HttpClient;
-import org.apache.commons.httpclient.methods.ByteArrayRequestEntity;
-import org.apache.commons.httpclient.methods.PostMethod;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.Bytes;
-import org.apache.kylin.common.util.JsonUtil;
-import org.apache.kylin.common.util.MailService;
-import org.apache.kylin.common.util.Pair;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.metadata.model.SegmentStatusEnum;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Function;
-import com.google.common.base.Preconditions;
-import com.google.common.collect.Lists;
-
-/**
- */
-public class StreamingMonitor {
-
-    private static final Logger logger = 
LoggerFactory.getLogger(StreamingMonitor.class);
-
-    public void checkCountAll(List<String> receivers, String host, String 
authorization, String projectName, String tableName) {
-        String title = "checkCountAll job(host:" + host + " tableName:" + 
tableName + ") ";
-        StringBuilder stringBuilder = new StringBuilder();
-        String url = host + "/kylin/api/query";
-        PostMethod request = new PostMethod(url);
-        try {
-
-            request.addRequestHeader("Authorization", "Basic " + 
authorization);
-            request.addRequestHeader("Content-Type", "application/json");
-            String query = String.format("{\"sql\":\"select count(*) from 
%s\",\"offset\":0,\"limit\":50000,\"acceptPartial\":true,\"project\":\"%s\"}", 
tableName, projectName);
-            request.setRequestEntity(new 
ByteArrayRequestEntity(query.getBytes()));
-
-            int statusCode = new HttpClient().executeMethod(request);
-            String msg = Bytes.toString(request.getResponseBody());
-            stringBuilder.append("host:").append(host).append("\n");
-            stringBuilder.append("query:").append(query).append("\n");
-            
stringBuilder.append("statusCode:").append(statusCode).append("\n");
-            if (statusCode == 200) {
-                title += "succeed";
-                final HashMap hashMap = JsonUtil.readValue(msg, HashMap.class);
-                
stringBuilder.append("results:").append(hashMap.get("results").toString()).append("\n");
-                
stringBuilder.append("duration:").append(hashMap.get("duration").toString()).append("\n");
-            } else {
-                title += "failed";
-                stringBuilder.append("response:").append(msg).append("\n");
-            }
-        } catch (Exception e) {
-            final StringWriter out = new StringWriter();
-            e.printStackTrace(new PrintWriter(out));
-            title += "failed";
-            stringBuilder.append(out.toString());
-        } finally {
-            request.releaseConnection();
-        }
-        logger.info("title:" + title);
-        logger.info("content:" + stringBuilder.toString());
-        sendMail(receivers, title, stringBuilder.toString());
-    }
-
-    public static final List<Pair<Long, Long>> findGaps(String cubeName) {
-        List<CubeSegment> segments = getSortedReadySegments(cubeName);
-        List<Pair<Long, Long>> gaps = Lists.newArrayList();
-        for (int i = 0; i < segments.size() - 1; ++i) {
-            CubeSegment first = segments.get(i);
-            CubeSegment second = segments.get(i + 1);
-            if (first.getDateRangeEnd() == second.getDateRangeStart()) {
-                continue;
-            } else if (first.getDateRangeEnd() < second.getDateRangeStart()) {
-                gaps.add(Pair.newPair(first.getDateRangeEnd(), 
second.getDateRangeStart()));
-            }
-        }
-        return gaps;
-    }
-
-    private static List<CubeSegment> getSortedReadySegments(String cubeName) {
-        final CubeInstance cube = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
-        Preconditions.checkNotNull(cube);
-        final List<CubeSegment> segments = 
cube.getSegment(SegmentStatusEnum.READY);
-        logger.info("totally " + segments.size() + " cubeSegments");
-        Collections.sort(segments);
-        return segments;
-    }
-
-    public static final List<Pair<String, String>> findOverlaps(String 
cubeName) {
-        List<CubeSegment> segments = getSortedReadySegments(cubeName);
-        List<Pair<String, String>> overlaps = Lists.newArrayList();
-        for (int i = 0; i < segments.size() - 1; ++i) {
-            CubeSegment first = segments.get(i);
-            CubeSegment second = segments.get(i + 1);
-            if (first.getDateRangeEnd() == second.getDateRangeStart()) {
-                continue;
-            } else {
-                overlaps.add(Pair.newPair(first.getName(), second.getName()));
-            }
-        }
-        return overlaps;
-    }
-
-    public void checkCube(List<String> receivers, String cubeName, String 
host) {
-        final CubeInstance cube = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).reloadCubeLocal(cubeName);
-        if (cube == null) {
-            logger.info("cube:" + cubeName + " does not exist");
-            return;
-        }
-        List<Pair<Long, Long>> gaps = findGaps(cubeName);
-        List<Pair<String, String>> overlaps = Lists.newArrayList();
-        StringBuilder content = new StringBuilder();
-        if (!gaps.isEmpty()) {
-            content.append("all 
gaps:").append("\n").append(StringUtils.join(Lists.transform(gaps, new 
Function<Pair<Long, Long>, String>() {
-                @Nullable
-                @Override
-                public String apply(Pair<Long, Long> input) {
-                    return parseInterval(input);
-                }
-            }), "\n")).append("\n");
-        }
-        if (!overlaps.isEmpty()) {
-            content.append("all 
overlaps:").append("\n").append(StringUtils.join(overlaps, "\n")).append("\n");
-        }
-        if (content.length() > 0) {
-            logger.info(content.toString());
-            sendMail(receivers, String.format("%s has gaps or overlaps on host 
%s", cubeName, host), content.toString());
-        } else {
-            logger.info("no gaps or overlaps");
-        }
-    }
-
-    private String parseInterval(Pair<Long, Long> interval) {
-        return String.format("{%d(%s), %d(%s)}", interval.getFirst(), new 
Date(interval.getFirst()).toString(), interval.getSecond(), new 
Date(interval.getSecond()).toString());
-    }
-
-    private void sendMail(List<String> receivers, String title, String 
content) {
-        final MailService mailService = new 
MailService(KylinConfig.getInstanceFromEnv());
-        mailService.sendMail(receivers, title, content, false);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java 
b/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
deleted file mode 100644
index 95fbc9d..0000000
--- 
a/streaming/src/main/java/org/apache/kylin/job/streaming/KafkaDataLoader.java
+++ /dev/null
@@ -1,54 +0,0 @@
-package org.apache.kylin.job.streaming;
-
-import java.util.List;
-import java.util.Properties;
-
-import javax.annotation.Nullable;
-
-import kafka.javaapi.producer.Producer;
-import kafka.producer.KeyedMessage;
-import kafka.producer.ProducerConfig;
-
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.streaming.BrokerConfig;
-import org.apache.kylin.streaming.KafkaClusterConfig;
-import org.apache.kylin.streaming.StreamingConfig;
-
-import com.google.common.base.Function;
-import com.google.common.collect.Collections2;
-import com.google.common.collect.Lists;
-
-/**
- * Load prepared data into kafka(for test use)
- */
-public class KafkaDataLoader {
-
-    public static void loadIntoKafka(StreamingConfig streamingConfig, 
List<String> messages) {
-
-        KafkaClusterConfig clusterConfig = 
streamingConfig.getKafkaClusterConfigs().get(0);
-        String brokerList = 
StringUtils.join(Collections2.transform(clusterConfig.getBrokerConfigs(), new 
Function<BrokerConfig, String>() {
-            @Nullable
-            @Override
-            public String apply(BrokerConfig brokerConfig) {
-                return brokerConfig.getHost() + ":" + brokerConfig.getPort();
-            }
-        }), ",");
-        Properties props = new Properties();
-        props.put("metadata.broker.list", brokerList);
-        props.put("serializer.class", "kafka.serializer.StringEncoder");
-        props.put("request.required.acks", "1");
-
-        ProducerConfig config = new ProducerConfig(props);
-
-        Producer<String, String> producer = new Producer<String, 
String>(config);
-
-        List<KeyedMessage<String, String>> keyedMessages = 
Lists.newArrayList();
-        for (int i = 0; i < messages.size(); ++i) {
-            KeyedMessage<String, String> keyedMessage = new 
KeyedMessage<String, String>(streamingConfig.getTopic(), String.valueOf(i), 
messages.get(i));
-            keyedMessages.add(keyedMessage);
-        }
-        producer.send(keyedMessages);
-        producer.close();
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-kylin/blob/55a85df0/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
 
b/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
index 4212fea..551006f 100644
--- 
a/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
+++ 
b/streaming/src/main/java/org/apache/kylin/job/streaming/StreamingBootstrap.java
@@ -63,7 +63,7 @@ import org.apache.kylin.invertedindex.IIInstance;
 import org.apache.kylin.invertedindex.IIManager;
 import org.apache.kylin.invertedindex.IISegment;
 import org.apache.kylin.invertedindex.model.IIDesc;
-import org.apache.kylin.job.monitor.StreamingMonitor;
+import org.apache.kylin.engine.streaming.monitor.StreamingMonitor;
 import org.apache.kylin.metadata.model.IntermediateColumnDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.apache.kylin.storage.hbase.HBaseConnection;

Reply via email to