This is an automated email from the ASF dual-hosted git repository.

nic pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new 0081e58  KYLIN-4371 Integrate System Cube with Real-time OLAP (#1102)
0081e58 is described below

commit 0081e5844fa8badc7588ac2ffb2875e047608516
Author: Xiaoxiang Yu <hit_la...@126.com>
AuthorDate: Wed Feb 26 22:49:04 2020 +0800

    KYLIN-4371 Integrate System Cube with Real-time OLAP (#1102)
    
    * Refine metrics system
    
    * KYLIN-4371 Update System Cube Metadata Creator
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   5 +
 .../apache/kylin/job/metrics/JobMetricsFacade.java |   9 +-
 .../org/apache/kylin/metrics/MetricsManager.java   |   2 +-
 .../apache/kylin/metrics/lib/ActiveReservoir.java  |  15 ++-
 ...ilter.java => ActiveReservoirRecordFilter.java} |  21 +++-
 .../kylin/metrics/lib/ActiveReservoirReporter.java |   4 +-
 .../java/org/apache/kylin/metrics/lib/Record.java  |   6 +-
 .../java/org/apache/kylin/metrics/lib/Sink.java    |   2 +-
 .../org/apache/kylin/metrics/lib/SinkTool.java     |  32 -----
 .../metrics/lib/impl/BaseScheduledReporter.java    |   3 +-
 .../kylin/metrics/lib/impl/InstantReservoir.java   |   8 +-
 .../kylin/metrics/lib/impl/MetricsSystem.java      |   8 +-
 .../apache/kylin/metrics/lib/impl/RecordEvent.java |  57 +++++----
 .../metrics/lib/impl/RecordEventTimeDetail.java    |   5 +-
 .../kylin/metrics/lib/impl/ReporterBuilder.java    |   8 +-
 .../kylin/metrics/lib/impl/TimePropertyEnum.java   |  11 +-
 .../kylin/metrics/property/JobPropertyEnum.java    |   1 +
 .../kylin/metrics/property/QueryPropertyEnum.java  |   1 +
 .../kylin/metrics/lib/impl/hive/HiveProducer.java  |   2 +-
 .../lib/impl/hive/HiveReservoirReporter.java       |   6 +-
 .../impl/kafka/KafkaActiveReserviorListener.java   |  33 ++---
 .../lib/impl/kafka/KafkaReservoirReporter.java     |   4 +-
 .../kylin/rest/metrics/QueryMetricsFacade.java     |  12 +-
 .../tool/metrics/systemcube/CubeDescCreator.java   |  42 +++----
 .../metrics/systemcube/CubeInstanceCreator.java    |  26 ++--
 .../tool/metrics/systemcube/HiveTableCreator.java  |   2 +-
 .../tool/metrics/systemcube/KylinTableCreator.java |  34 +++--
 .../tool/metrics/systemcube/ModelCreator.java      |  26 ++--
 .../kylin/tool/metrics/systemcube/SCCreator.java   | 139 +++++++++++++--------
 .../HiveSinkTool.java => def/MetricsSinkDesc.java} |  46 ++++++-
 .../systemcube/streamingv2/KafkaTopicCreator.java  |  46 +++++++
 .../streamingv2/StreamingMetadataCreator.java      | 100 +++++++++++++++
 tool/src/main/resources/SCSinkTools.json           |  33 +++--
 .../tool/metrics/systemcube/SCCreatorTest.java     |  50 +++-----
 34 files changed, 505 insertions(+), 294 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 7d5cda8..09f7c8a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -2208,6 +2208,11 @@ public abstract class KylinConfigBase implements 
Serializable {
         return getPropertiesByPrefix("kylin.metrics.");
     }
 
+    public int printSampleEventRatio(){
+        String val = getOptional("kylin.metrics.kafka-sample-ratio", "10000");
+        return Integer.parseInt(val);
+    }
+
     // 
============================================================================
     // tool
     // 
============================================================================
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java 
b/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java
index e9d41bf..5d5c607 100644
--- a/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java
+++ b/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java
@@ -41,13 +41,13 @@ public class JobMetricsFacade {
             setJobWrapper(metricsEvent, jobStats.user, jobStats.projectName, 
jobStats.cubeName, jobStats.jobId,
                     jobStats.jobType, jobStats.cubingType);
             setJobStats(metricsEvent, jobStats.tableSize, jobStats.cubeSize, 
jobStats.buildDuration,
-                    jobStats.waitResourceTime, jobStats.perBytesTimeCost, //
+                    jobStats.waitResourceTime, jobStats.perBytesTimeCost,
                     jobStats.dColumnDistinct, jobStats.dDictBuilding, 
jobStats.dCubingInmem, jobStats.dHfileConvert);
         } else {
             metricsEvent = new 
TimedRecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJobException());
             setJobExceptionWrapper(metricsEvent, jobStats.user, 
jobStats.projectName, jobStats.cubeName, jobStats.jobId,
                     jobStats.jobType, jobStats.cubingType, //
-                    jobStats.throwable.getClass());
+                    jobStats.throwable);
         }
         MetricsManager.getInstance().update(metricsEvent);
     }
@@ -78,9 +78,10 @@ public class JobMetricsFacade {
 
     private static <T extends Throwable> void 
setJobExceptionWrapper(RecordEvent metricsEvent, String user,
             String projectName, String cubeName, String jobId, String jobType, 
String cubingType,
-            Class<T> throwableClass) {
+            Throwable throwable) {
         setJobWrapper(metricsEvent, user, projectName, cubeName, jobId, 
jobType, cubingType);
-        metricsEvent.put(JobPropertyEnum.EXCEPTION.toString(), 
throwableClass.getName());
+        metricsEvent.put(JobPropertyEnum.EXCEPTION.toString(), 
throwable.getClass().getName());
+        metricsEvent.put(JobPropertyEnum.EXCEPTION_MSG.toString(), 
throwable.getMessage());
     }
 
     public static class JobStatisticsResult {
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java 
b/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
index a5f1088..c0fce0b 100644
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java
@@ -131,7 +131,7 @@ public class MetricsManager {
                                 .start();
                     } catch (Exception e) {
                         logger.warn("Cannot initialize 
ActiveReservoirReporter: Builder class - " + subEntry.getFirst()
-                                + ", Properties - " + subEntry.getSecond());
+                                + ", Properties - " + subEntry.getSecond(), e);
                     }
                 }
                 Metrics.register(registerName, activeReservoir);
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java
index 542a781..1a3c8db 100644
--- 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java
@@ -21,24 +21,24 @@ package org.apache.kylin.metrics.lib;
 import java.io.Closeable;
 
 /**
- * Reservoir for mertics message, a Reservoir(something like cache)'s duty is 
store mertics message temporarily
- * and emit messages to external Sink by notifying specific 
ActiveReservoirListener.
+ * Reservoir for metrics event, a Reservoir(something like cache/buffer)'s 
duty is store metrics event temporarily
+ * and emit events to external Sink by notifying specific 
ActiveReservoirListener.
  */
 public interface ActiveReservoir extends Closeable {
 
     /**
-     * @return how many mertics message was currently cached(not emit)
+     * @return how many metrics message was currently cached(not emit)
      */
     int size();
 
     /**
      * stage metrics message into Reservoir, but whether to emit it to 
external storage
-     * immediately is decided by specific implemention
+     * immediately is decided by specific implementation
      */
     void update(Record record);
 
     /**
-     * add listener which responsed to message update
+     * add listener which is in charge of send metrics event update
      */
     void addListener(ActiveReservoirListener listener);
 
@@ -46,10 +46,13 @@ public interface ActiveReservoir extends Closeable {
 
     void removeAllListener();
 
+    /**
+     * A backup listener, it will be called when one of the previous listener 
failed.
+     */
     void setHAListener(ActiveReservoirListener listener);
 
     /**
-     * do some prepare to accept metrics message
+     * do some prepare to accept metrics event
      */
     void start();
 
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirFilter.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirRecordFilter.java
similarity index 72%
rename from 
core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirFilter.java
rename to 
core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirRecordFilter.java
index 5cffcfc..5b731e5 100644
--- 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirFilter.java
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirRecordFilter.java
@@ -14,19 +14,19 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.metrics.lib;
 
 /**
  * A filter used to determine whether or not an active reservoir should be 
reported, among other things.
  */
-public interface ActiveReservoirFilter {
+public interface ActiveReservoirRecordFilter {
 
     /**
      * Matches all active reservoirs, regardless of type or name.
      */
-    ActiveReservoirFilter ALL = new ActiveReservoirFilter() {
+    ActiveReservoirRecordFilter ALL = new ActiveReservoirRecordFilter() {
         @Override
         public boolean matches(String name, ActiveReservoir activeReservoir) {
             return true;
@@ -36,9 +36,18 @@ public interface ActiveReservoirFilter {
     /**
      * Returns {@code true} if the active reservoir matches the filter; {@code 
false} otherwise.
      *
-     * @param name      the active reservoir's name
-     * @param activeReservoir    the active reservoir
+     * @param name            the active reservoir's name
+     * @param activeReservoir the active reservoir
      * @return {@code true} if the active reservoir matches the filter
      */
-    boolean matches(String name, ActiveReservoir activeReservoir);
+    default boolean matches(String name, ActiveReservoir activeReservoir) {
+        return true;
+    }
+
+    /**
+     * This method is used to check and whether to filter a Record on listener 
side
+     */
+    default boolean checkRecord(Record record) {
+        return true;
+    }
 }
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java
index fa807e8..9b13018 100644
--- 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java
@@ -14,7 +14,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.metrics.lib;
 
@@ -27,7 +27,7 @@ import org.apache.kylin.common.util.Pair;
 import com.google.common.base.Strings;
 
 /**
- * ActiveReservoirReporter report metrics message from ActiveReservoir
+ * ActiveReservoirReporter report metrics event via listener from 
ActiveReservoir
  */
 public abstract class ActiveReservoirReporter implements Closeable {
 
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java
index 112951ad..0ce6d74 100644
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java
@@ -21,14 +21,14 @@ package org.apache.kylin.metrics.lib;
 import java.util.Map;
 
 /**
- * Metrics message
+ * Metrics event in Metrics System
  */
 public interface Record {
 
     /**
-     *  For classification
+     *  For classification, will mapping to Hive Table or Kafka Topic
      */
-    String getType();
+    String getSubject();
 
     /**
      *  For keep ordering in the same category
diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java
index a04356d..681c131 100644
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java
+++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java
@@ -19,7 +19,7 @@
 package org.apache.kylin.metrics.lib;
 
 /**
- * Sink is where mertics data will write to
+ * Sink is where metrics event will write to
  */
 public interface Sink {
     String getTableFromSubject(String subject);
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/SinkTool.java 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/SinkTool.java
deleted file mode 100644
index b55516a..0000000
--- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/SinkTool.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
-*/
-
-package org.apache.kylin.metrics.lib;
-
-import java.io.Serializable;
-import java.util.Map;
-
-public interface SinkTool extends Serializable {
-    int getStorageType();
-
-    int getSourceType();
-
-    String getTableNameForMetrics(String subject);
-
-    Map<String, String> getCubeDescOverrideProperties();
-}
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java
index c006352..735a69c 100644
--- 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java
@@ -29,9 +29,8 @@ import org.slf4j.LoggerFactory;
 import com.google.common.util.concurrent.ThreadFactoryBuilder;
 
 /**
- * @deprecated abandon code which seems to be replaced by 
ActiveReservoirReporter, should be removed later
+ * Extension Point for use-defined, fix-rated Metrics Reporter
  */
-@Deprecated
 public abstract class BaseScheduledReporter implements Closeable {
 
     private static final Logger logger = 
LoggerFactory.getLogger(BaseScheduledReporter.class);
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java
index 6bd9539..78de933 100644
--- 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java
@@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory;
 import com.google.common.collect.Lists;
 
 /**
- * A Reservoir which don't staged metrics message at all, emit them in no time.
+ * A Reservoir which don't staged metrics event at all, emit them in no time.
  */
 public class InstantReservoir extends AbstractActiveReservoir {
 
@@ -52,8 +52,7 @@ public class InstantReservoir extends AbstractActiveReservoir 
{
         for (ActiveReservoirListener listener : listeners) {
             if (!notifyListenerOfUpdatedRecord(listener, record)) {
                 ifSucceed = false;
-                logger.warn(
-                        "It fails to notify listener " + listener.toString() + 
" of updated record " + Arrays.toString(record.getKey()));
+                logger.info("Fails to notify {} of record {}", listener, 
Arrays.toString(record.getKey()));
             }
         }
         if (!ifSucceed) {
@@ -68,8 +67,7 @@ public class InstantReservoir extends AbstractActiveReservoir 
{
     }
 
     private boolean notifyListenerHAOfUpdatedRecord(Record record) {
-        logger.info("The HA listener " + listenerHA.toString() + " for updated 
record " + Arrays.toString(record.getKey())
-                + " will be started");
+        logger.info("Use HA Listener {} to notify record {}", listenerHA, 
Arrays.toString(record.getKey()));
         if (!notifyListenerOfUpdatedRecord(listenerHA, record)) {
             logger.error("The HA listener also fails!!!");
             return false;
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java
index feae5c5..1e46bce 100644
--- 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java
@@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.metrics.lib.ActiveReservoir;
-import org.apache.kylin.metrics.lib.ActiveReservoirFilter;
+import org.apache.kylin.metrics.lib.ActiveReservoirRecordFilter;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -98,7 +98,7 @@ public class MetricsSystem extends MetricRegistry {
      *
      * @param filter a filter
      */
-    public void removeActiveReservoirMatching(ActiveReservoirFilter filter) {
+    public void removeActiveReservoirMatching(ActiveReservoirRecordFilter 
filter) {
         for (Map.Entry<String, ActiveReservoir> entry : 
activeReservoirs.entrySet()) {
             if (filter.matches(entry.getKey(), entry.getValue())) {
                 removeActiveReservoir(entry.getKey());
@@ -123,7 +123,7 @@ public class MetricsSystem extends MetricRegistry {
      * @return all the active reservoirs in the metrics system
      */
     public SortedMap<String, ActiveReservoir> getActiveReservoirs() {
-        return getActiveReservoirs(ActiveReservoirFilter.ALL);
+        return getActiveReservoirs(ActiveReservoirRecordFilter.ALL);
     }
 
     /**
@@ -132,7 +132,7 @@ public class MetricsSystem extends MetricRegistry {
      * @param filter    the active reservoir filter to match
      * @return all the active reservoirs in the metrics system
      */
-    public SortedMap<String, ActiveReservoir> 
getActiveReservoirs(ActiveReservoirFilter filter) {
+    public SortedMap<String, ActiveReservoir> 
getActiveReservoirs(ActiveReservoirRecordFilter filter) {
         final TreeMap<String, ActiveReservoir> reservoirs = new TreeMap<>();
         for (Map.Entry<String, ActiveReservoir> entry : 
activeReservoirs.entrySet()) {
             if (filter.matches(entry.getKey(), entry.getValue())) {
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java
index 5118fc9..7f1d02d 100644
--- 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java
@@ -14,7 +14,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.metrics.lib.impl;
 
@@ -28,24 +28,32 @@ import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 
-import org.apache.kylin.common.threadlocal.InternalThreadLocal;
 import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.metrics.lib.Record;
 
 import com.google.common.collect.Maps;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class RecordEvent implements Record, Map<String, Object>, Serializable {
 
-    private static final InternalThreadLocal<ByteArrayOutputStream> _localBaos 
= new InternalThreadLocal<ByteArrayOutputStream>();
+    private static final Logger logger = 
LoggerFactory.getLogger(RecordEvent.class);
+
+    private static final ThreadLocal<ByteArrayOutputStream> _localBaos = new 
ThreadLocal<>();
+
+    public static final String LOCAL_HOSTNAME;
 
-    static String localHostname;
     static {
+        String localHostname1;
         try {
             InetAddress addr = InetAddress.getLocalHost();
-            localHostname = addr.getHostName() + ":" + addr.getHostAddress();
+            localHostname1 = addr.getHostName() + ":" + addr.getHostAddress();
+            logger.info("RecordEvent using hostname : {}.", localHostname1);
         } catch (UnknownHostException e) {
-            localHostname = "Unknown";
+            logger.info("Unexpected ", e);
+            localHostname1 = "Unknown";
         }
+        LOCAL_HOSTNAME = localHostname1;
     }
 
     private final Map<String, Object> backingMap;
@@ -55,11 +63,7 @@ public class RecordEvent implements Record, Map<String, 
Object>, Serializable {
     }
 
     public RecordEvent(String eventType) {
-        this(eventType, localHostname);
-    }
-
-    public RecordEvent(String eventType, long time) {
-        this(eventType, localHostname, time);
+        this(eventType, LOCAL_HOSTNAME);
     }
 
     public RecordEvent(String eventType, String host) {
@@ -71,28 +75,27 @@ public class RecordEvent implements Record, Map<String, 
Object>, Serializable {
     }
 
     /**
-     *
      * @param map
-     * @param eventType     mandatory   with null check
-     * @param host          mandatory   without null check
-     * @param time          mandatory   with null check
+     * @param eventType mandatory   with null check
+     * @param host      mandatory   without null check
+     * @param time      mandatory   with null check
      */
     public RecordEvent(Map<String, Object> map, String eventType, String host, 
long time) {
-        backingMap = map != null ? map : Maps.<String, Object> newHashMap();
+        backingMap = map != null ? map : Maps.<String, Object>newHashMap();
         setEventType(eventType);
         setHost(host);
         setTime(time);
     }
 
     public String getEventType() {
-        return (String) get(RecordReserveKeyEnum.TYPE.toString());
+        return (String) get(RecordReserveKeyEnum.EVENT_SUBJECT.toString());
     }
 
     private void setEventType(String eventType) {
         if (eventType == null) {
             throw new IllegalArgumentException("EventType cannot be null.");
         }
-        put(RecordReserveKeyEnum.TYPE.toString(), eventType);
+        put(RecordReserveKeyEnum.EVENT_SUBJECT.toString(), eventType);
     }
 
     public String getHost() {
@@ -202,7 +205,7 @@ public class RecordEvent implements Record, Map<String, 
Object>, Serializable {
     }
 
     @Override
-    public String getType() {
+    public String getSubject() {
         return getEventType();
     }
 
@@ -211,20 +214,20 @@ public class RecordEvent implements Record, Map<String, 
Object>, Serializable {
         return (getHost() + "-" + getTime() + "-" + 
getID()).getBytes(StandardCharsets.UTF_8);
     }
 
-    @Override
     /**
      * Event type and time does not belong to value part
      */
+    @Override
     public Map<String, Object> getValueRaw() {
         Map<String, Object> cloneMap = Maps.newHashMap(backingMap);
-        cloneMap.remove(RecordReserveKeyEnum.TYPE.toString());
+        cloneMap.remove(RecordReserveKeyEnum.EVENT_SUBJECT.toString());
         return cloneMap;
     }
 
-    @Override
     /**
      * Event type does not belong to value part, it's for classification
      */
+    @Override
     public byte[] getValue() {
         try {
             ByteArrayOutputStream baos = _localBaos.get();
@@ -248,11 +251,15 @@ public class RecordEvent implements Record, Map<String, 
Object>, Serializable {
     }
 
     public enum RecordReserveKeyEnum {
-        TYPE("EVENT_TYPE"), ID("EVENT_ID"), HOST("HOST"), TIME("KTIMESTAMP");
+
+        EVENT_SUBJECT("EVENT_TYPE")
+        , ID("EVENT_ID") // Not used currently
+        , HOST("HOST")
+        , TIME("KTIMESTAMP");
 
         private final String reserveKey;
 
-        private RecordReserveKeyEnum(String key) {
+        RecordReserveKeyEnum(String key) {
             this.reserveKey = key;
         }
 
@@ -263,7 +270,7 @@ public class RecordEvent implements Record, Map<String, 
Object>, Serializable {
 
         public RecordReserveKeyEnum getByKey(String key) {
             for (RecordReserveKeyEnum reserveKey : 
RecordReserveKeyEnum.values()) {
-                if (reserveKey.reserveKey.equals(key)) {
+                if (reserveKey.reserveKey.equalsIgnoreCase(key)) {
                     return reserveKey;
                 }
             }
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java
index aa1d307..4c549bc 100644
--- 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java
@@ -24,12 +24,11 @@ import java.util.Locale;
 import java.util.TimeZone;
 
 import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.threadlocal.InternalThreadLocal;
 
 public class RecordEventTimeDetail {
     private static final TimeZone timeZone;
-    private static final InternalThreadLocal<SimpleDateFormat> 
dateFormatThreadLocal = new InternalThreadLocal<SimpleDateFormat>();
-    private static final InternalThreadLocal<SimpleDateFormat> 
timeFormatThreadLocal = new InternalThreadLocal<SimpleDateFormat>();
+    private static final ThreadLocal<SimpleDateFormat> dateFormatThreadLocal = 
new ThreadLocal<>();
+    private static final ThreadLocal<SimpleDateFormat> timeFormatThreadLocal = 
new ThreadLocal<>();
 
     static {
         timeZone = 
TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone());
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java
index 22fadd3..0b6ef35 100644
--- 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java
@@ -14,7 +14,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.metrics.lib.impl;
 
@@ -23,9 +23,15 @@ import java.util.Properties;
 import org.apache.kylin.metrics.lib.ActiveReservoir;
 import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
 
+/**
+ * Builder of ActiveReservoirReporter
+ */
 public abstract class ReporterBuilder {
     protected final ActiveReservoir registry;
     protected final Properties props;
+    public static final String LISTENER_FILTER_CLASS = "listener.filter.class";
+    public static final String CALLBACK_URL = "callback.url";
+    public static final String LISTENER_FILTER_DEFAULT_CLASS = 
"org.apache.kylin.metrics.lib.impl.callback.CallbackActiveReservoirFilter";
 
     protected ReporterBuilder(ActiveReservoir activeReservoir) {
         this.registry = activeReservoir;
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java
index c013b4c..b6e700e 100644
--- 
a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java
@@ -23,9 +23,14 @@ import java.util.Locale;
 import com.google.common.base.Strings;
 
 public enum TimePropertyEnum {
-    YEAR("KYEAR_BEGIN_DATE"), MONTH("KMONTH_BEGIN_DATE"), 
WEEK_BEGIN_DATE("KWEEK_BEGIN_DATE"), DAY_DATE(
-            "KDAY_DATE"), DAY_TIME(
-                    "KDAY_TIME"), TIME_HOUR("KTIME_HOUR"), 
TIME_MINUTE("KTIME_MINUTE"), TIME_SECOND("KTIME_SECOND");
+    YEAR("KYEAR_BEGIN_DATE"),
+    MONTH("KMONTH_BEGIN_DATE"),
+    WEEK_BEGIN_DATE("KWEEK_BEGIN_DATE"),
+    DAY_DATE("KDAY_DATE"),
+    DAY_TIME("KDAY_TIME"),
+    TIME_HOUR("KTIME_HOUR"),
+    TIME_MINUTE("KTIME_MINUTE"),
+    TIME_SECOND("KTIME_SECOND");
 
     private final String propertyName;
 
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/property/JobPropertyEnum.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/property/JobPropertyEnum.java
index e55cf2b..cb36599 100644
--- 
a/core-metrics/src/main/java/org/apache/kylin/metrics/property/JobPropertyEnum.java
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/property/JobPropertyEnum.java
@@ -33,6 +33,7 @@ public enum JobPropertyEnum {
     ALGORITHM("CUBING_TYPE"),
     STATUS("JOB_STATUS"),
     EXCEPTION("EXCEPTION"),
+    EXCEPTION_MSG("EXCEPTION_MSG"),
 
     SOURCE_SIZE("TABLE_SIZE"),
     CUBE_SIZE("CUBE_SIZE"),
diff --git 
a/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryPropertyEnum.java
 
b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryPropertyEnum.java
index e9c51df..c83c669 100644
--- 
a/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryPropertyEnum.java
+++ 
b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryPropertyEnum.java
@@ -28,6 +28,7 @@ import com.google.common.base.Strings;
 public enum QueryPropertyEnum {
 
     ID_CODE("QUERY_HASH_CODE"),
+    SQL("QUERY_SQL"),
     TYPE("QUERY_TYPE"),
     USER("KUSER"),
     PROJECT("PROJECT"),
diff --git 
a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
 
b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
index 5ab6c9f..c2ec1ec 100644
--- 
a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
+++ 
b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java
@@ -257,7 +257,7 @@ public class HiveProducer {
         partitionKVs.put(TimePropertyEnum.DAY_DATE.toString(),
                 rawValue.get(TimePropertyEnum.DAY_DATE.toString()).toString());
 
-        return 
parseToHiveProducerRecord(HiveReservoirReporter.getTableFromSubject(record.getType()),
 partitionKVs,
+        return 
parseToHiveProducerRecord(HiveReservoirReporter.getTableFromSubject(record.getSubject()),
 partitionKVs,
                 rawValue);
     }
 
diff --git 
a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
 
b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
index 8e8f3b6..9d93e99 100644
--- 
a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
+++ 
b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java
@@ -132,10 +132,10 @@ public class HiveReservoirReporter extends 
ActiveReservoirReporter {
             try {
                 Map<String, List<Record>> queues = new HashMap<>();
                 for (Record record : records) {
-                    List<Record> recordQueues = queues.get(record.getType());
+                    List<Record> recordQueues = 
queues.get(record.getSubject());
                     if (recordQueues == null) {
                         recordQueues = new ArrayList<>();
-                        queues.put(record.getType(), recordQueues);
+                        queues.put(record.getSubject(), recordQueues);
                     }
                     recordQueues.add(record);
                 }
@@ -153,7 +153,7 @@ public class HiveReservoirReporter extends 
ActiveReservoirReporter {
 
         public boolean onRecordUpdate(final Record record) {
             try {
-                HiveProducer producer = getProducer(record.getType());
+                HiveProducer producer = getProducer(record.getSubject());
                 producer.send(record);
             } catch (Exception e) {
                 logger.error(e.getMessage(), e);
diff --git 
a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
 
b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
index 311f3e3..df79c57 100644
--- 
a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
+++ 
b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java
@@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.kafka.clients.producer.Callback;
 import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.metrics.lib.ActiveReservoirListener;
 import org.apache.kylin.metrics.lib.Record;
 import org.slf4j.Logger;
@@ -33,19 +34,18 @@ public abstract class KafkaActiveReserviorListener 
implements ActiveReservoirLis
     public static final long TOPIC_AVAILABLE_TAG = 0L;
     protected static final Logger logger = 
LoggerFactory.getLogger(KafkaActiveReserviorListener.class);
     protected Long maxBlockMs = 1800000L;
-    protected int maxRecordForLogNum = 10000;
+    protected int maxRecordForLogNum = 
KylinConfig.getInstanceFromEnv().printSampleEventRatio();
     protected int maxRecordSkipForLogNum = 10000;
     protected ConcurrentHashMap<String, Long> topicsIfAvailable = new 
ConcurrentHashMap<>();
-    private int nRecord = 0;
-    private int nRecordSkip = 0;
-    private Callback produceCallback = new Callback() {
-        @Override
-        public void onCompletion(RecordMetadata metadata, Exception exception) 
{
-            if (exception != null) {
-                exception.printStackTrace();
-                return;
-            }
-            logger.info("topic:" + metadata.topic() + "; partition: " + 
metadata.partition() + "; offset: " + metadata.offset());
+    private long nRecord = 0;
+    private long nRecordSkip = 0;
+    private int threshold = Integer.min((int)(maxRecordForLogNum * 0.002), 25);
+
+    private Callback produceCallback = (RecordMetadata metadata, Exception 
exception) -> {
+        if(exception != null){
+            logger.warn("Unexpected exception.",  exception);
+        } else {
+            logger.debug("Topic:{} ; partition:{} ; offset:{} .", 
metadata.topic(), metadata.partition(), metadata.offset());
         }
     };
 
@@ -67,17 +67,19 @@ public abstract class KafkaActiveReserviorListener 
implements ActiveReservoirLis
     public boolean onRecordUpdate(final List<Record> records) {
         try {
             for (Record record : records) {
-                String topic = decorateTopic(record.getType());
+                String topic = decorateTopic(record.getSubject());
+                if (nRecord <= threshold) {
+                    logger.debug("Send record {} to topic : {}", record, 
topic);
+                }
                 if (!checkAvailable(topic)) {
                     if (nRecordSkip % maxRecordSkipForLogNum == 0) {
                         nRecordSkip = 0;
-                        logger.warn("Skip to send record to topic " + topic);
+                        logger.warn("Skip to send record to topic {}", topic);
                     }
                     nRecordSkip++;
                     continue;
                 }
                 if (nRecord % maxRecordForLogNum == 0) {
-                    nRecord = 0;
                     sendWrapper(topic, record, produceCallback);
                 } else {
                     sendWrapper(topic, record, null);
@@ -101,7 +103,7 @@ public abstract class KafkaActiveReserviorListener 
implements ActiveReservoirLis
                 topicsIfAvailable.put(topic, TOPIC_AVAILABLE_TAG);
                 return true;
             } catch (org.apache.kafka.common.errors.TimeoutException e) {
-                logger.warn("Fail to fetch metadata for topic " + topic);
+                logger.warn("Fail to fetch metadata for topic " + topic, e);
                 setUnAvailable(topic);
                 return false;
             }
@@ -110,6 +112,7 @@ public abstract class KafkaActiveReserviorListener 
implements ActiveReservoirLis
     }
 
     protected void setUnAvailable(String topic) {
+        logger.debug("Cannot find topic {}", topic);
         topicsIfAvailable.put(topic, System.currentTimeMillis());
     }
 }
diff --git 
a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
 
b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
index a5ea3aa..a7b58a6 100644
--- 
a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
+++ 
b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java
@@ -14,7 +14,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.metrics.lib.impl.kafka;
 
@@ -58,7 +58,7 @@ public class KafkaReservoirReporter extends 
ActiveReservoirReporter {
         return new Builder(activeReservoir);
     }
 
-    private static String decorateTopic(String topic) {
+    public static String decorateTopic(String topic) {
         return ActiveReservoirReporter.KYLIN_PREFIX + "_" + 
KAFKA_REPORTER_SUFFIX + "_" + topic;
     }
 
diff --git 
a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
 
b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
index 226166d..11f5f6e 100644
--- 
a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
+++ 
b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java
@@ -56,7 +56,7 @@ public class QueryMetricsFacade {
     private static final HashFunction hashFunc = Hashing.murmur3_128();
 
     private static boolean enabled = false;
-    private static ConcurrentHashMap<String, QueryMetrics> metricsMap = new 
ConcurrentHashMap<String, QueryMetrics>();
+    private static ConcurrentHashMap<String, QueryMetrics> metricsMap = new 
ConcurrentHashMap<>();
 
     public static void init() {
         enabled = KylinConfig.getInstanceFromEnv().getQueryMetricsEnabled();
@@ -66,7 +66,7 @@ public class QueryMetricsFacade {
         DefaultMetricsSystem.initialize("Kylin");
     }
 
-    public static long getSqlHashCode(String sql) {
+    private static long getSqlHashCode(String sql) {
         return hashFunc.hashString(sql, Charset.forName("UTF-8")).asLong();
     }
 
@@ -112,12 +112,11 @@ public class QueryMetricsFacade {
             //For update rpc level related metrics
             MetricsManager.getInstance().update(rpcMetricsEvent);
         }
-        long sqlHashCode = getSqlHashCode(sqlRequest.getSql());
         for (QueryContext.CubeSegmentStatisticsResult contextEntry : 
sqlResponse.getCubeSegmentStatisticsList()) {
             RecordEvent queryMetricsEvent = new TimedRecordEvent(
                     
KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQuery());
             setQueryWrapper(queryMetricsEvent, //
-                    user, sqlHashCode, sqlResponse.isStorageCacheUsed() ? 
"CACHE" : contextEntry.getQueryType(),
+                    user, sqlRequest.getSql(), 
sqlResponse.isStorageCacheUsed() ? "CACHE" : contextEntry.getQueryType(),
                     norm(sqlRequest.getProject()), 
contextEntry.getRealization(), contextEntry.getRealizationType(),
                     sqlResponse.getThrowable());
 
@@ -206,10 +205,11 @@ public class QueryMetricsFacade {
         metricsEvent.put(QueryCubePropertyEnum.WEIGHT_PER_HIT.toString(), 
weightPerHit);
     }
 
-    private static void setQueryWrapper(RecordEvent metricsEvent, String user, 
long queryHashCode, String queryType,
+    private static void setQueryWrapper(RecordEvent metricsEvent, String user, 
String sql, String queryType,
             String projectName, String realizationName, int realizationType, 
Throwable throwable) {
         metricsEvent.put(QueryPropertyEnum.USER.toString(), user);
-        metricsEvent.put(QueryPropertyEnum.ID_CODE.toString(), queryHashCode);
+        metricsEvent.put(QueryPropertyEnum.ID_CODE.toString(), 
getSqlHashCode(sql));
+        metricsEvent.put(QueryPropertyEnum.SQL.toString(), sql);
         metricsEvent.put(QueryPropertyEnum.TYPE.toString(), queryType);
         metricsEvent.put(QueryPropertyEnum.PROJECT.toString(), projectName);
         metricsEvent.put(QueryPropertyEnum.REALIZATION.toString(), 
realizationName);
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
index f525e44..5f64da0 100644
--- 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
+++ 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java
@@ -40,7 +40,6 @@ import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.IEngineAware;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.kylin.metadata.model.ParameterDesc;
-import org.apache.kylin.metrics.lib.SinkTool;
 import org.apache.kylin.metrics.lib.impl.RecordEvent;
 import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
 import org.apache.kylin.metrics.property.JobPropertyEnum;
@@ -51,11 +50,12 @@ import 
org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import com.google.common.collect.Sets;
+import org.apache.kylin.tool.metrics.systemcube.def.MetricsSinkDesc;
 
 public class CubeDescCreator {
 
-    public static CubeDesc generateKylinCubeDescForMetricsQuery(KylinConfig 
config, SinkTool sinkTool) {
-        String tableName = 
sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQuery());
+    public static CubeDesc generateKylinCubeDescForMetricsQuery(KylinConfig 
config, MetricsSinkDesc sinkDesc) {
+        String tableName = 
sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQuery());
 
         //Set for dimensions
         List<String> dimensions = ModelCreator.getDimensionsForMetricsQuery();
@@ -131,12 +131,12 @@ public class CubeDescCreator {
         HBaseMappingDesc hBaseMapping = new HBaseMappingDesc();
         hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList));
 
-        return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), 
dimensionDescList, measureDescList,
-                rowKeyDesc, aggGroup, hBaseMapping, 
sinkTool.getCubeDescOverrideProperties());
+        return generateKylinCubeDesc(tableName, sinkDesc.getStorageType(), 
dimensionDescList, measureDescList,
+                rowKeyDesc, aggGroup, hBaseMapping, 
sinkDesc.getCubeDescOverrideProperties());
     }
 
-    public static CubeDesc 
generateKylinCubeDescForMetricsQueryCube(KylinConfig config, SinkTool sinkTool) 
{
-        String tableName = 
sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQueryCube());
+    public static CubeDesc 
generateKylinCubeDescForMetricsQueryCube(KylinConfig config, MetricsSinkDesc 
sinkDesc) {
+        String tableName = 
sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryCube());
 
         //Set for dimensions
         List<String> dimensions = 
ModelCreator.getDimensionsForMetricsQueryCube();
@@ -218,12 +218,12 @@ public class CubeDescCreator {
         HBaseMappingDesc hBaseMapping = new HBaseMappingDesc();
         hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList));
 
-        return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), 
dimensionDescList, measureDescList,
-                rowKeyDesc, aggGroup, hBaseMapping, 
sinkTool.getCubeDescOverrideProperties());
+        return generateKylinCubeDesc(tableName, sinkDesc.getStorageType(), 
dimensionDescList, measureDescList,
+                rowKeyDesc, aggGroup, hBaseMapping, 
sinkDesc.getCubeDescOverrideProperties());
     }
 
-    public static CubeDesc generateKylinCubeDescForMetricsQueryRPC(KylinConfig 
config, SinkTool sinkTool) {
-        String tableName = 
sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQueryRpcCall());
+    public static CubeDesc generateKylinCubeDescForMetricsQueryRPC(KylinConfig 
config, MetricsSinkDesc sinkDesc) {
+        String tableName = 
sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryRpcCall());
 
         //Set for dimensions
         List<String> dimensions = 
ModelCreator.getDimensionsForMetricsQueryRPC();
@@ -288,12 +288,12 @@ public class CubeDescCreator {
         HBaseMappingDesc hBaseMapping = new HBaseMappingDesc();
         hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList));
 
-        return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), 
dimensionDescList, measureDescList,
-                rowKeyDesc, aggGroup, hBaseMapping, 
sinkTool.getCubeDescOverrideProperties());
+        return generateKylinCubeDesc(tableName, sinkDesc.getStorageType(), 
dimensionDescList, measureDescList,
+                rowKeyDesc, aggGroup, hBaseMapping, 
sinkDesc.getCubeDescOverrideProperties());
     }
 
-    public static CubeDesc generateKylinCubeDescForMetricsJob(KylinConfig 
config, SinkTool sinkTool) {
-        String tableName = 
sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectJob());
+    public static CubeDesc generateKylinCubeDescForMetricsJob(KylinConfig 
config, MetricsSinkDesc sinkDesc) {
+        String tableName = 
sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectJob());
 
         //Set for dimensions
         List<String> dimensions = ModelCreator.getDimensionsForMetricsJob();
@@ -368,12 +368,12 @@ public class CubeDescCreator {
         HBaseMappingDesc hBaseMapping = new HBaseMappingDesc();
         hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList));
 
-        return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), 
dimensionDescList, measureDescList,
-                rowKeyDesc, aggGroup, hBaseMapping, 
sinkTool.getCubeDescOverrideProperties());
+        return generateKylinCubeDesc(tableName, sinkDesc.getStorageType(), 
dimensionDescList, measureDescList,
+                rowKeyDesc, aggGroup, hBaseMapping, 
sinkDesc.getCubeDescOverrideProperties());
     }
 
-    public static CubeDesc 
generateKylinCubeDescForMetricsJobException(KylinConfig config, SinkTool 
sinkTool) {
-        String tableName = 
sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectJobException());
+    public static CubeDesc 
generateKylinCubeDescForMetricsJobException(KylinConfig config, MetricsSinkDesc 
sinkDesc) {
+        String tableName = 
sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectJobException());
 
         //Set for dimensions
         List<String> dimensions = 
ModelCreator.getDimensionsForMetricsJobException();
@@ -432,8 +432,8 @@ public class CubeDescCreator {
         HBaseMappingDesc hBaseMapping = new HBaseMappingDesc();
         hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList));
 
-        return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), 
dimensionDescList, measureDescList,
-                rowKeyDesc, aggGroup, hBaseMapping, 
sinkTool.getCubeDescOverrideProperties());
+        return generateKylinCubeDesc(tableName, sinkDesc.getStorageType(), 
dimensionDescList, measureDescList,
+                rowKeyDesc, aggGroup, hBaseMapping, 
sinkDesc.getCubeDescOverrideProperties());
     }
 
     public static CubeDesc generateKylinCubeDesc(String tableName, int 
storageType,
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeInstanceCreator.java
 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeInstanceCreator.java
index 0fcec3b..7d70bc2 100644
--- 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeInstanceCreator.java
+++ 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeInstanceCreator.java
@@ -27,16 +27,14 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.metadata.model.Segments;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.metrics.lib.SinkTool;
-import org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool;
+import org.apache.kylin.tool.metrics.systemcube.def.MetricsSinkDesc;
 
 public class CubeInstanceCreator {
 
     public static void main(String[] args) throws Exception {
-        //        KylinConfig.setSandboxEnvIfPossible();
         KylinConfig config = KylinConfig.getInstanceFromEnv();
 
-        CubeInstance cubeInstance = 
generateKylinCubeInstanceForMetricsQuery("ADMIN", config, new HiveSinkTool());
+        CubeInstance cubeInstance = 
generateKylinCubeInstanceForMetricsQuery("ADMIN", config, new 
MetricsSinkDesc());
         ByteArrayOutputStream buf = new ByteArrayOutputStream();
         DataOutputStream dout = new DataOutputStream(buf);
         CubeManager.CUBE_SERIALIZER.serialize(cubeInstance, dout);
@@ -46,31 +44,31 @@ public class CubeInstanceCreator {
     }
 
     public static CubeInstance generateKylinCubeInstanceForMetricsQuery(String 
owner, KylinConfig config,
-            SinkTool sinkTool) {
-        return generateKylinCubeInstance(owner, 
sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQuery()));
+            MetricsSinkDesc sinkDesc) {
+        return generateKylinCubeInstance(owner, 
sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQuery()));
     }
 
     public static CubeInstance 
generateKylinCubeInstanceForMetricsQueryCube(String owner, KylinConfig config,
-            SinkTool sinkTool) {
+            MetricsSinkDesc sinkDesc) {
         return generateKylinCubeInstance(owner,
-                
sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQueryCube()));
+                
sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryCube()));
     }
 
     public static CubeInstance 
generateKylinCubeInstanceForMetricsQueryRPC(String owner, KylinConfig config,
-            SinkTool sinkTool) {
+            MetricsSinkDesc sinkDesc) {
         return generateKylinCubeInstance(owner,
-                
sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQueryRpcCall()));
+                
sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryRpcCall()));
     }
 
     public static CubeInstance generateKylinCubeInstanceForMetricsJob(String 
owner, KylinConfig config,
-            SinkTool sinkTool) {
-        return generateKylinCubeInstance(owner, 
sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectJob()));
+            MetricsSinkDesc sinkDesc) {
+        return generateKylinCubeInstance(owner, 
sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectJob()));
     }
 
     public static CubeInstance 
generateKylinCubeInstanceForMetricsJobException(String owner, KylinConfig 
config,
-            SinkTool sinkTool) {
+            MetricsSinkDesc sinkDesc) {
         return generateKylinCubeInstance(owner,
-                
sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectJobException()));
+                
sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectJobException()));
     }
 
     public static CubeInstance generateKylinCubeInstance(String owner, String 
tableName) {
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
index d07cd08..f6e4cfe 100644
--- 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
+++ 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java
@@ -39,7 +39,6 @@ import com.google.common.collect.Lists;
 public class HiveTableCreator {
 
     public static void main(String[] args) {
-        //        KylinConfig.setSandboxEnvIfPossible();
         KylinConfig config = KylinConfig.getInstanceFromEnv();
 
         System.out.println(generateAllSQL(config));
@@ -272,6 +271,7 @@ public class HiveTableCreator {
             return null;
         }
 
+        @Override
         public String toString() {
             return typeName;
         }
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java
 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java
index c84df4a..76fa7bd 100644
--- 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java
+++ 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java
@@ -30,18 +30,16 @@ import org.apache.kylin.metadata.model.ColumnDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metrics.MetricsManager;
 import org.apache.kylin.metrics.lib.ActiveReservoirReporter;
-import org.apache.kylin.metrics.lib.SinkTool;
-import org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool;
+import org.apache.kylin.tool.metrics.systemcube.def.MetricsSinkDesc;
 
 import com.google.common.collect.Lists;
 
 public class KylinTableCreator {
 
     public static void main(String[] args) throws Exception {
-        //        KylinConfig.setSandboxEnvIfPossible();
         KylinConfig config = KylinConfig.getInstanceFromEnv();
 
-        TableDesc kylinTable = generateKylinTableForMetricsQuery(config, new 
HiveSinkTool());
+        TableDesc kylinTable = generateKylinTableForMetricsQuery(config, new 
MetricsSinkDesc());
         ByteArrayOutputStream buf = new ByteArrayOutputStream();
         DataOutputStream dout = new DataOutputStream(buf);
         TableMetadataManager.TABLE_SERIALIZER.serialize(kylinTable, dout);
@@ -50,53 +48,53 @@ public class KylinTableCreator {
         System.out.println(buf.toString("UTF-8"));
     }
 
-    public static TableDesc generateKylinTableForMetricsQuery(KylinConfig 
kylinConfig, SinkTool sinkTool) {
+    public static TableDesc generateKylinTableForMetricsQuery(KylinConfig 
kylinConfig, MetricsSinkDesc sinkDesc) {
         List<Pair<String, String>> columns = Lists.newLinkedList();
         columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQuery());
         columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
-        return generateKylinTable(kylinConfig, sinkTool, 
kylinConfig.getKylinMetricsSubjectQuery(), columns);
+        return generateKylinTable(kylinConfig, sinkDesc, 
kylinConfig.getKylinMetricsSubjectQuery(), columns);
     }
 
-    public static TableDesc generateKylinTableForMetricsQueryCube(KylinConfig 
kylinConfig, SinkTool sinkTool) {
+    public static TableDesc generateKylinTableForMetricsQueryCube(KylinConfig 
kylinConfig, MetricsSinkDesc sinkDesc) {
         List<Pair<String, String>> columns = Lists.newLinkedList();
         columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryCube());
         columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
-        return generateKylinTable(kylinConfig, sinkTool, 
kylinConfig.getKylinMetricsSubjectQueryCube(), columns);
+        return generateKylinTable(kylinConfig, sinkDesc, 
kylinConfig.getKylinMetricsSubjectQueryCube(), columns);
     }
 
-    public static TableDesc generateKylinTableForMetricsQueryRPC(KylinConfig 
kylinConfig, SinkTool sinkTool) {
+    public static TableDesc generateKylinTableForMetricsQueryRPC(KylinConfig 
kylinConfig, MetricsSinkDesc sinkDesc) {
         List<Pair<String, String>> columns = Lists.newLinkedList();
         columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryRPC());
         columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
-        return generateKylinTable(kylinConfig, sinkTool, 
kylinConfig.getKylinMetricsSubjectQueryRpcCall(), columns);
+        return generateKylinTable(kylinConfig, sinkDesc, 
kylinConfig.getKylinMetricsSubjectQueryRpcCall(), columns);
     }
 
-    public static TableDesc generateKylinTableForMetricsJob(KylinConfig 
kylinConfig, SinkTool sinkTool) {
+    public static TableDesc generateKylinTableForMetricsJob(KylinConfig 
kylinConfig, MetricsSinkDesc sinkDesc) {
         List<Pair<String, String>> columns = Lists.newLinkedList();
         columns.addAll(HiveTableCreator.getHiveColumnsForMetricsJob());
         columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
-        return generateKylinTable(kylinConfig, sinkTool, 
kylinConfig.getKylinMetricsSubjectJob(), columns);
+        return generateKylinTable(kylinConfig, sinkDesc, 
kylinConfig.getKylinMetricsSubjectJob(), columns);
     }
 
-    public static TableDesc 
generateKylinTableForMetricsJobException(KylinConfig kylinConfig, SinkTool 
sinkTool) {
+    public static TableDesc 
generateKylinTableForMetricsJobException(KylinConfig kylinConfig, 
MetricsSinkDesc sinkDesc) {
         List<Pair<String, String>> columns = Lists.newLinkedList();
         
columns.addAll(HiveTableCreator.getHiveColumnsForMetricsJobException());
         columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable());
-        return generateKylinTable(kylinConfig, sinkTool, 
kylinConfig.getKylinMetricsSubjectJobException(), columns);
+        return generateKylinTable(kylinConfig, sinkDesc, 
kylinConfig.getKylinMetricsSubjectJobException(), columns);
     }
 
-    public static TableDesc generateKylinTable(KylinConfig kylinConfig, 
SinkTool sinkTool, String subject,
-            List<Pair<String, String>> columns) {
+    public static TableDesc generateKylinTable(KylinConfig kylinConfig, 
MetricsSinkDesc sinkDesc, String subject,
+                                               List<Pair<String, String>> 
columns) {
         TableDesc kylinTable = new TableDesc();
 
         Pair<String, String> tableNameSplits = ActiveReservoirReporter
-                .getTableNameSplits(sinkTool.getTableNameForMetrics(subject));
+                .getTableNameSplits(sinkDesc.getTableNameForMetrics(subject));
         kylinTable.setUuid(RandomUtil.randomUUID().toString());
         kylinTable.setDatabase(tableNameSplits.getFirst());
         kylinTable.setName(tableNameSplits.getSecond());
         kylinTable.setTableType(null);
         kylinTable.setLastModified(0L);
-        kylinTable.setSourceType(sinkTool.getSourceType());
+        kylinTable.setSourceType(sinkDesc.getSourceType());
 
         ColumnDesc[] columnDescs = new ColumnDesc[columns.size()];
         for (int i = 0; i < columns.size(); i++) {
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/ModelCreator.java 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/ModelCreator.java
index cc94411..6f4a18f 100644
--- 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/ModelCreator.java
+++ 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/ModelCreator.java
@@ -29,14 +29,13 @@ import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.JoinTableDesc;
 import org.apache.kylin.metadata.model.ModelDimensionDesc;
 import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.metrics.lib.SinkTool;
 import org.apache.kylin.metrics.lib.impl.RecordEvent;
 import org.apache.kylin.metrics.lib.impl.TimePropertyEnum;
 import org.apache.kylin.metrics.property.JobPropertyEnum;
 import org.apache.kylin.metrics.property.QueryCubePropertyEnum;
 import org.apache.kylin.metrics.property.QueryPropertyEnum;
 import org.apache.kylin.metrics.property.QueryRPCPropertyEnum;
-import org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool;
+import org.apache.kylin.tool.metrics.systemcube.def.MetricsSinkDesc;
 
 import com.google.common.collect.Lists;
 
@@ -45,10 +44,9 @@ public class ModelCreator {
     public static final Serializer<DataModelDesc> MODELDESC_SERIALIZER = new 
JsonSerializer<>(DataModelDesc.class);
 
     public static void main(String[] args) throws Exception {
-        //        KylinConfig.setSandboxEnvIfPossible();
         KylinConfig config = KylinConfig.getInstanceFromEnv();
 
-        DataModelDesc kylinModel = generateKylinModelForMetricsQuery("ADMIN", 
config, new HiveSinkTool());
+        DataModelDesc kylinModel = generateKylinModelForMetricsQuery("ADMIN", 
config, new MetricsSinkDesc());
         ByteArrayOutputStream buf = new ByteArrayOutputStream();
         DataOutputStream dout = new DataOutputStream(buf);
         MODELDESC_SERIALIZER.serialize(kylinModel, dout);
@@ -66,36 +64,36 @@ public class ModelCreator {
     }
 
     public static DataModelDesc generateKylinModelForMetricsQuery(String 
owner, KylinConfig kylinConfig,
-            SinkTool sinkTool) {
-        String tableName = 
sinkTool.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQuery());
+            MetricsSinkDesc sinkDesc) {
+        String tableName = 
sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQuery());
         return generateKylinModel(owner, tableName, 
getDimensionsForMetricsQuery(), getMeasuresForMetricsQuery(),
                 getPartitionDesc(tableName));
     }
 
     public static DataModelDesc generateKylinModelForMetricsQueryCube(String 
owner, KylinConfig kylinConfig,
-            SinkTool sinkTool) {
-        String tableName = 
sinkTool.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQueryCube());
+            MetricsSinkDesc sinkDesc) {
+        String tableName = 
sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQueryCube());
         return generateKylinModel(owner, tableName, 
getDimensionsForMetricsQueryCube(),
                 getMeasuresForMetricsQueryCube(), getPartitionDesc(tableName));
     }
 
     public static DataModelDesc generateKylinModelForMetricsQueryRPC(String 
owner, KylinConfig kylinConfig,
-            SinkTool sinkTool) {
-        String tableName = 
sinkTool.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQueryRpcCall());
+            MetricsSinkDesc sinkDesc) {
+        String tableName = 
sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQueryRpcCall());
         return generateKylinModel(owner, tableName, 
getDimensionsForMetricsQueryRPC(), getMeasuresForMetricsQueryRPC(),
                 getPartitionDesc(tableName));
     }
 
     public static DataModelDesc generateKylinModelForMetricsJob(String owner, 
KylinConfig kylinConfig,
-            SinkTool sinkTool) {
-        String tableName = 
sinkTool.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectJob());
+            MetricsSinkDesc sinkDesc) {
+        String tableName = 
sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectJob());
         return generateKylinModel(owner, tableName, 
getDimensionsForMetricsJob(), getMeasuresForMetricsJob(),
                 getPartitionDesc(tableName));
     }
 
     public static DataModelDesc 
generateKylinModelForMetricsJobException(String owner, KylinConfig kylinConfig,
-            SinkTool sinkTool) {
-        String tableName = 
sinkTool.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectJobException());
+            MetricsSinkDesc sinkDesc) {
+        String tableName = 
sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectJobException());
         return generateKylinModel(owner, tableName, 
getDimensionsForMetricsJobException(),
                 getMeasuresForMetricsJobException(), 
getPartitionDesc(tableName));
     }
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java
index 8a6c98c..a2809c0 100644
--- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java
+++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java
@@ -14,28 +14,26 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.tool.metrics.systemcube;
 
-import java.io.BufferedInputStream;
 import java.io.BufferedWriter;
 import java.io.ByteArrayOutputStream;
 import java.io.DataOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
+import java.io.FileNotFoundException;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStreamWriter;
 import java.nio.charset.StandardCharsets;
-import java.util.Collection;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
+import com.fasterxml.jackson.core.type.TypeReference;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
+import org.apache.commons.io.FileUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.persistence.RootPersistentEntity;
 import org.apache.kylin.common.persistence.Serializer;
@@ -50,14 +48,19 @@ import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.metadata.project.ProjectInstance;
-import org.apache.kylin.metrics.lib.SinkTool;
-import org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool;
+import org.apache.kylin.stream.core.source.StreamingSourceConfig;
+import org.apache.kylin.tool.metrics.systemcube.def.MetricsSinkDesc;
+import org.apache.kylin.tool.metrics.systemcube.streamingv2.KafkaTopicCreator;
+import 
org.apache.kylin.tool.metrics.systemcube.streamingv2.StreamingMetadataCreator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Strings;
 import com.google.common.collect.Lists;
 
+/**
+ * System Cube Metadata Creator CLI
+ */
 public class SCCreator extends AbstractApplication {
 
     private static final Logger logger = 
LoggerFactory.getLogger(SCCreator.class);
@@ -74,8 +77,10 @@ public class SCCreator extends AbstractApplication {
     private static final String D_PROJECT = "project/";
     private static final String D_TABLE = "table/";
     private static final String D_MODEL_DESC = "model_desc/";
+    private static final String D_STREAMING_V2 = "streaming_v2/";
 
     private static final String F_HIVE_SQL = 
"create_hive_tables_for_system_cubes";
+    private static final String F_KAFKA_TOPIC = 
"create_kafka_topic_for_system_cubes";
 
     protected final Options options;
 
@@ -104,7 +109,7 @@ public class SCCreator extends AbstractApplication {
         String output = optionsHelper.getOptionValue(OPTION_OUTPUT);
         String inputConfig = optionsHelper.getOptionValue(OPTION_INPUT_CONFIG);
         if (Strings.isNullOrEmpty(inputConfig)) {
-            throw new RuntimeException("Input configuration file should be 
specified!!!");
+            throw new FileNotFoundException("Input configuration file should 
be specified!!!");
         }
 
         execute(owner, output, inputConfig);
@@ -118,37 +123,51 @@ public class SCCreator extends AbstractApplication {
             output += "/";
         }
 
-        Set<SinkTool> sourceToolSet = JsonUtil.readValueWithTyping(
-                new BufferedInputStream(new FileInputStream(new 
File(inputConfig))), HashSet.class);
-        run(owner, output, sourceToolSet);
+        TypeReference<List<MetricsSinkDesc>> typeRef = new 
TypeReference<List<MetricsSinkDesc>>() {
+        };
+        List<MetricsSinkDesc> metricsSinkDescList = 
JsonUtil.readValue(FileUtils.readFileToString(new File(inputConfig)), typeRef);
+        run(owner, output, metricsSinkDescList);
     }
 
-    private void run(String owner, String output, Collection<SinkTool> 
sinkToolSet) throws IOException {
+    private void run(String owner, String output, List<MetricsSinkDesc> 
sinkToolSet) throws IOException {
         List<TableDesc> kylinTables = Lists.newArrayList();
         List<DataModelDesc> kylinModels = Lists.newArrayList();
-        List<CubeDesc> kylinCubeDescs = Lists.newArrayList();
+        List<CubeDesc> cubeDescList = Lists.newArrayList();
         List<CubeInstance> kylinCubeInstances = Lists.newArrayList();
+        List<StreamingSourceConfig> streamingSourceConfigs = 
Lists.newArrayList();
 
-        boolean ifHive = false;
-        for (SinkTool sourceTool : sinkToolSet) {
-            if (sourceTool instanceof HiveSinkTool) {
-                ifHive = true;
-            } else {
-                logger.warn("current version only support hive sink!!!");
-                continue;
+
+        boolean hasHive = false;
+        boolean hasKafka = false;
+        for (MetricsSinkDesc sinkDesc : sinkToolSet) {
+            if (sinkDesc.useHive()) {
+                hasHive = true;
+            } else if (sinkDesc.useKafka()) {
+                hasKafka = true;
+            }
+            kylinTables.addAll(generateKylinTableForSystemCube(sinkDesc));
+            kylinModels.addAll(generateKylinModelForSystemCube(owner, 
sinkDesc));
+            cubeDescList.addAll(generateKylinCubeDescForSystemCube(sinkDesc));
+            
kylinCubeInstances.addAll(generateKylinCubeInstanceForSystemCube(owner, 
sinkDesc));
+
+            if (sinkDesc.useKafka()) {
+                
streamingSourceConfigs.add(StreamingMetadataCreator.generateKylinTableForMetricsJob(config,
 sinkDesc));
+                
streamingSourceConfigs.add(StreamingMetadataCreator.generateKylinTableForMetricsJobException(config,
 sinkDesc));
+                
streamingSourceConfigs.add(StreamingMetadataCreator.generateKylinTableForMetricsQuery(config,
 sinkDesc));
+                
streamingSourceConfigs.add(StreamingMetadataCreator.generateKylinTableForMetricsQueryRpcCall(config,
 sinkDesc));
+                
streamingSourceConfigs.add(StreamingMetadataCreator.generateKylinTableForMetricsQueryCube(config,
 sinkDesc));
             }
-            kylinTables.addAll(generateKylinTableForSystemCube(sourceTool));
-            kylinModels.addAll(generateKylinModelForSystemCube(owner, 
sourceTool));
-            
kylinCubeDescs.addAll(generateKylinCubeDescForSystemCube(sourceTool));
-            
kylinCubeInstances.addAll(generateKylinCubeInstanceForSystemCube(owner, 
sourceTool));
         }
 
-        if (ifHive) {
+        if (hasHive) {
             generateHiveTableSQLFileForSystemCube(output);
         }
+        if (hasKafka) {
+            generateKafkaTopicFileForSystemCube(output);
+        }
 
         ProjectInstance projectInstance = 
ProjectCreator.generateKylinProjectInstance(owner, kylinTables, kylinModels,
-                kylinCubeDescs);
+                cubeDescList);
         generateKylinProjectFileForSystemCube(output, projectInstance);
         for (TableDesc tableDesc : kylinTables) {
             generateKylinTableFileForSystemCube(output, tableDesc);
@@ -156,54 +175,58 @@ public class SCCreator extends AbstractApplication {
         for (DataModelDesc dataModelDesc : kylinModels) {
             generateKylinModelFileForSystemCube(output, dataModelDesc);
         }
-        for (CubeDesc cubeDesc : kylinCubeDescs) {
+        for (CubeDesc cubeDesc : cubeDescList) {
             generateKylinCubeDescFileForSystemCube(output, cubeDesc);
         }
         for (CubeInstance cubeInstance : kylinCubeInstances) {
             generateKylinCubeInstanceFileForSystemCube(output, cubeInstance);
         }
+
+        for (StreamingSourceConfig sourceConfig : streamingSourceConfigs) {
+            generateKylinStreamingConfigForSystemCube(output, sourceConfig);
+        }
     }
 
-    private List<TableDesc> generateKylinTableForSystemCube(SinkTool sinkTool) 
{
+    private List<TableDesc> generateKylinTableForSystemCube(MetricsSinkDesc 
sinkDesc) {
         List<TableDesc> result = Lists.newLinkedList();
-        result.add(KylinTableCreator.generateKylinTableForMetricsQuery(config, 
sinkTool));
-        
result.add(KylinTableCreator.generateKylinTableForMetricsQueryCube(config, 
sinkTool));
-        
result.add(KylinTableCreator.generateKylinTableForMetricsQueryRPC(config, 
sinkTool));
-        result.add(KylinTableCreator.generateKylinTableForMetricsJob(config, 
sinkTool));
-        
result.add(KylinTableCreator.generateKylinTableForMetricsJobException(config, 
sinkTool));
+        result.add(KylinTableCreator.generateKylinTableForMetricsQuery(config, 
sinkDesc));
+        
result.add(KylinTableCreator.generateKylinTableForMetricsQueryCube(config, 
sinkDesc));
+        
result.add(KylinTableCreator.generateKylinTableForMetricsQueryRPC(config, 
sinkDesc));
+        result.add(KylinTableCreator.generateKylinTableForMetricsJob(config, 
sinkDesc));
+        
result.add(KylinTableCreator.generateKylinTableForMetricsJobException(config, 
sinkDesc));
 
         return result;
     }
 
-    private List<DataModelDesc> generateKylinModelForSystemCube(String owner, 
SinkTool sinkTool) {
+    private List<DataModelDesc> generateKylinModelForSystemCube(String owner, 
MetricsSinkDesc sinkDesc) {
         List<DataModelDesc> result = Lists.newLinkedList();
-        result.add(ModelCreator.generateKylinModelForMetricsQuery(owner, 
config, sinkTool));
-        result.add(ModelCreator.generateKylinModelForMetricsQueryCube(owner, 
config, sinkTool));
-        result.add(ModelCreator.generateKylinModelForMetricsQueryRPC(owner, 
config, sinkTool));
-        result.add(ModelCreator.generateKylinModelForMetricsJob(owner, config, 
sinkTool));
-        
result.add(ModelCreator.generateKylinModelForMetricsJobException(owner, config, 
sinkTool));
+        result.add(ModelCreator.generateKylinModelForMetricsQuery(owner, 
config, sinkDesc));
+        result.add(ModelCreator.generateKylinModelForMetricsQueryCube(owner, 
config, sinkDesc));
+        result.add(ModelCreator.generateKylinModelForMetricsQueryRPC(owner, 
config, sinkDesc));
+        result.add(ModelCreator.generateKylinModelForMetricsJob(owner, config, 
sinkDesc));
+        
result.add(ModelCreator.generateKylinModelForMetricsJobException(owner, config, 
sinkDesc));
 
         return result;
     }
 
-    private List<CubeDesc> generateKylinCubeDescForSystemCube(SinkTool 
sinkTool) {
+    private List<CubeDesc> generateKylinCubeDescForSystemCube(MetricsSinkDesc 
sinkDesc) {
         List<CubeDesc> result = Lists.newLinkedList();
-        
result.add(CubeDescCreator.generateKylinCubeDescForMetricsQuery(config, 
sinkTool));
-        
result.add(CubeDescCreator.generateKylinCubeDescForMetricsQueryCube(config, 
sinkTool));
-        
result.add(CubeDescCreator.generateKylinCubeDescForMetricsQueryRPC(config, 
sinkTool));
-        result.add(CubeDescCreator.generateKylinCubeDescForMetricsJob(config, 
sinkTool));
-        
result.add(CubeDescCreator.generateKylinCubeDescForMetricsJobException(config, 
sinkTool));
+        
result.add(CubeDescCreator.generateKylinCubeDescForMetricsQuery(config, 
sinkDesc));
+        
result.add(CubeDescCreator.generateKylinCubeDescForMetricsQueryCube(config, 
sinkDesc));
+        
result.add(CubeDescCreator.generateKylinCubeDescForMetricsQueryRPC(config, 
sinkDesc));
+        result.add(CubeDescCreator.generateKylinCubeDescForMetricsJob(config, 
sinkDesc));
+        
result.add(CubeDescCreator.generateKylinCubeDescForMetricsJobException(config, 
sinkDesc));
 
         return result;
     }
 
-    private List<CubeInstance> generateKylinCubeInstanceForSystemCube(String 
owner, SinkTool sinkTool) {
+    private List<CubeInstance> generateKylinCubeInstanceForSystemCube(String 
owner, MetricsSinkDesc sinkDesc) {
         List<CubeInstance> result = Lists.newLinkedList();
-        
result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQuery(owner, 
config, sinkTool));
-        
result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQueryCube(owner,
 config, sinkTool));
-        
result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQueryRPC(owner,
 config, sinkTool));
-        
result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsJob(owner, 
config, sinkTool));
-        
result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsJobException(owner,
 config, sinkTool));
+        
result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQuery(owner, 
config, sinkDesc));
+        
result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQueryCube(owner,
 config, sinkDesc));
+        
result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQueryRPC(owner,
 config, sinkDesc));
+        
result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsJob(owner, 
config, sinkDesc));
+        
result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsJobException(owner,
 config, sinkDesc));
 
         return result;
     }
@@ -213,6 +236,11 @@ public class SCCreator extends AbstractApplication {
         saveToFile(output + F_HIVE_SQL + ".sql", contents);
     }
 
+    private void generateKafkaTopicFileForSystemCube(String output) throws 
IOException {
+        String contents = KafkaTopicCreator.generateCreateCommand(config);
+        saveToFile(output + F_KAFKA_TOPIC + ".sh", contents);
+    }
+
     private void generateKylinTableFileForSystemCube(String output, TableDesc 
kylinTable) throws IOException {
         saveSystemCubeMetadataToFile(output + D_TABLE + 
kylinTable.getIdentity() + ".json", kylinTable,
                 TableMetadataManager.TABLE_SERIALIZER);
@@ -223,6 +251,11 @@ public class SCCreator extends AbstractApplication {
                 ModelCreator.MODELDESC_SERIALIZER);
     }
 
+    private void generateKylinStreamingConfigForSystemCube(String output, 
StreamingSourceConfig streamingConfig) throws IOException {
+        saveSystemCubeMetadataToFile(output + D_STREAMING_V2 + 
streamingConfig.getName() + ".json", streamingConfig,
+                StreamingMetadataCreator.STREAMING_SOURCE_CONFIG_SERIALIZER);
+    }
+
     private void generateKylinCubeInstanceFileForSystemCube(String output, 
CubeInstance cubeInstance)
             throws IOException {
         saveSystemCubeMetadataToFile(output + D_CUBE_INSTANCE + 
cubeInstance.getName() + ".json", cubeInstance,
@@ -241,7 +274,7 @@ public class SCCreator extends AbstractApplication {
     }
 
     private <T extends RootPersistentEntity> void 
saveSystemCubeMetadataToFile(String fileName, T metadata,
-            Serializer serializer) throws IOException {
+                                                                               
Serializer serializer) throws IOException {
         ByteArrayOutputStream buf = new ByteArrayOutputStream();
         DataOutputStream dout = new DataOutputStream(buf);
         serializer.serialize(metadata, dout);
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/util/HiveSinkTool.java
 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/def/MetricsSinkDesc.java
similarity index 62%
rename from 
tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/util/HiveSinkTool.java
rename to 
tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/def/MetricsSinkDesc.java
index 5907bf2..8dcf01e 100644
--- 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/util/HiveSinkTool.java
+++ 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/def/MetricsSinkDesc.java
@@ -14,24 +14,27 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
-package org.apache.kylin.tool.metrics.systemcube.util;
+package org.apache.kylin.tool.metrics.systemcube.def;
 
 import java.util.Map;
 
 import org.apache.kylin.metadata.model.ISourceAware;
 import org.apache.kylin.metadata.model.IStorageAware;
-import org.apache.kylin.metrics.lib.SinkTool;
 import org.apache.kylin.metrics.lib.impl.hive.HiveReservoirReporter;
 
 import com.fasterxml.jackson.annotation.JsonAutoDetect;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.collect.Maps;
+import org.apache.kylin.metrics.lib.impl.kafka.KafkaReservoirReporter;
 
 @SuppressWarnings("serial")
 @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, 
getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = 
JsonAutoDetect.Visibility.NONE, setterVisibility = 
JsonAutoDetect.Visibility.NONE)
-public class HiveSinkTool implements SinkTool {
+public class MetricsSinkDesc {
+
+    @JsonProperty("sink")
+    private String sinkType;
 
     @JsonProperty("storage_type")
     protected int storageType = IStorageAware.ID_SHARDED_HBASE;
@@ -39,16 +42,31 @@ public class HiveSinkTool implements SinkTool {
     @JsonProperty("cube_desc_override_properties")
     protected Map<String, String> cubeDescOverrideProperties = 
Maps.newHashMap();
 
+    @JsonProperty("table_properties")
+    protected Map<String, String> tableProperties = Maps.newHashMap();
+
     public int getStorageType() {
         return storageType;
     }
 
     public int getSourceType() {
-        return ISourceAware.ID_HIVE;
+        if (sinkType.equalsIgnoreCase("hive")) {
+            return ISourceAware.ID_HIVE;
+        } else if (sinkType.equalsIgnoreCase("kafka")) {
+            return ISourceAware.ID_KAFKA;
+        } else {
+            return ISourceAware.ID_HIVE;
+        }
     }
 
     public String getTableNameForMetrics(String subject) {
-        return HiveReservoirReporter.getTableFromSubject(subject);
+        if (sinkType.equalsIgnoreCase("hive")) {
+            return HiveReservoirReporter.getTableFromSubject(subject);
+        } else if (sinkType.equalsIgnoreCase("kafka")) {
+            return KafkaReservoirReporter.getTableFromSubject(subject);
+        } else {
+            return HiveReservoirReporter.getTableFromSubject(subject);
+        }
     }
 
     public Map<String, String> getCubeDescOverrideProperties() {
@@ -58,4 +76,20 @@ public class HiveSinkTool implements SinkTool {
     public void setCubeDescOverrideProperties(Map<String, String> 
cubeDescOverrideProperties) {
         this.cubeDescOverrideProperties = cubeDescOverrideProperties;
     }
+
+    public String getSinkType() {
+        return sinkType;
+    }
+
+    public boolean useKafka(){
+        return sinkType.equalsIgnoreCase("kafka");
+    }
+    public boolean useHive(){
+        return sinkType.equalsIgnoreCase("hive");
+    }
+
+
+    public Map<String, String> getTableProperties() {
+        return tableProperties;
+    }
 }
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/KafkaTopicCreator.java
 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/KafkaTopicCreator.java
new file mode 100644
index 0000000..c495919
--- /dev/null
+++ 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/KafkaTopicCreator.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.tool.metrics.systemcube.streamingv2;
+
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.metrics.lib.impl.kafka.KafkaReservoirReporter;
+
+public class KafkaTopicCreator {
+
+    public static String generateCreateCommand(KylinConfig config) {
+        StringBuilder sb = new StringBuilder();
+        String[] topics = new String[]{
+                config.getKylinMetricsSubjectQuery(),
+                config.getKylinMetricsSubjectQueryCube(),
+                config.getKylinMetricsSubjectQueryRpcCall(),
+                config.getKylinMetricsSubjectJob(),
+                config.getKylinMetricsSubjectJobException()};
+        for (String topic : topics) {
+            String finalTopic = KafkaReservoirReporter.decorateTopic(topic);
+            sb.append("sh kafka-topics");
+            sb.append(" --create");
+            sb.append(" --topic ");
+            sb.append(finalTopic);
+            sb.append(" --zookeeper localhost:2181");
+            sb.append(" --partitions 3");
+            sb.append(" --replication-factor 1");
+            sb.append(" \n");
+        }
+        return sb.toString();
+    }
+}
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/StreamingMetadataCreator.java
 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/StreamingMetadataCreator.java
new file mode 100644
index 0000000..93121ce
--- /dev/null
+++ 
b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/StreamingMetadataCreator.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.tool.metrics.systemcube.streamingv2;
+
+import com.google.common.collect.Lists;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.JsonSerializer;
+import org.apache.kylin.common.persistence.Serializer;
+import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.metrics.lib.impl.kafka.KafkaReservoirReporter;
+import org.apache.kylin.stream.core.source.MessageParserInfo;
+import org.apache.kylin.stream.core.source.StreamingSourceConfig;
+import org.apache.kylin.tool.metrics.systemcube.HiveTableCreator;
+import org.apache.kylin.tool.metrics.systemcube.def.MetricsSinkDesc;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class StreamingMetadataCreator {
+
+    public static final Serializer<StreamingSourceConfig> 
STREAMING_SOURCE_CONFIG_SERIALIZER = new 
JsonSerializer<>(StreamingSourceConfig.class);
+
+
+    public static StreamingSourceConfig 
generateKylinTableForMetricsQuery(KylinConfig kylinConfig, MetricsSinkDesc 
sinkDesc) {
+        List<Pair<String, String>> columns = Lists.newLinkedList();
+        columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQuery());
+        return generateStreamingV2Config(kylinConfig, sinkDesc, 
kylinConfig.getKylinMetricsSubjectQuery(), columns);
+    }
+
+    public static StreamingSourceConfig 
generateKylinTableForMetricsQueryCube(KylinConfig kylinConfig, MetricsSinkDesc 
sinkDesc) {
+        List<Pair<String, String>> columns = Lists.newLinkedList();
+        columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryCube());
+        return generateStreamingV2Config(kylinConfig, sinkDesc, 
kylinConfig.getKylinMetricsSubjectQueryCube(), columns);
+    }
+
+    public static StreamingSourceConfig 
generateKylinTableForMetricsQueryRpcCall(KylinConfig kylinConfig, 
MetricsSinkDesc sinkDesc) {
+        List<Pair<String, String>> columns = Lists.newLinkedList();
+        columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryRPC());
+        return generateStreamingV2Config(kylinConfig, sinkDesc, 
kylinConfig.getKylinMetricsSubjectQueryRpcCall(), columns);
+    }
+
+    public static StreamingSourceConfig 
generateKylinTableForMetricsJob(KylinConfig kylinConfig, MetricsSinkDesc 
sinkDesc) {
+        List<Pair<String, String>> columns = Lists.newLinkedList();
+        columns.addAll(HiveTableCreator.getHiveColumnsForMetricsJob());
+        return generateStreamingV2Config(kylinConfig, sinkDesc, 
kylinConfig.getKylinMetricsSubjectJob(), columns);
+    }
+
+    public static StreamingSourceConfig 
generateKylinTableForMetricsJobException(KylinConfig kylinConfig, 
MetricsSinkDesc sinkDesc) {
+        List<Pair<String, String>> columns = Lists.newLinkedList();
+        
columns.addAll(HiveTableCreator.getHiveColumnsForMetricsJobException());
+        return generateStreamingV2Config(kylinConfig, sinkDesc, 
kylinConfig.getKylinMetricsSubjectJobException(), columns);
+    }
+
+    private static StreamingSourceConfig generateStreamingV2Config(KylinConfig 
kylinConfig, MetricsSinkDesc sinkDesc, String subject, List<Pair<String, 
String>> columns) {
+        StreamingSourceConfig streamingSourceConfig = new 
StreamingSourceConfig();
+        MessageParserInfo parserInfo = new MessageParserInfo();
+        parserInfo.setFormatTs(false);
+        parserInfo.setTsColName("KTIMESTAMP");
+        parserInfo.setTsPattern("MS");
+        
parserInfo.setTsParser("org.apache.kylin.stream.source.kafka.LongTimeParser");
+        Map<String, String> columnToSourceFieldMapping = new HashMap<>();
+        for (Pair<String, String> col : columns) {
+            columnToSourceFieldMapping.put(col.getKey(), col.getKey());
+        }
+        parserInfo.setColumnToSourceFieldMapping(columnToSourceFieldMapping);
+
+        Map<String, String> properties = new HashMap<>();
+        String topic = KafkaReservoirReporter.decorateTopic(subject);
+        String table = 
KafkaReservoirReporter.sink.getTableFromSubject(subject);
+
+        properties.put("topic", topic);
+        properties.put("bootstrap.servers", 
sinkDesc.getTableProperties().get("bootstrap.servers"));
+
+        streamingSourceConfig.setName(table);
+        streamingSourceConfig.setProperties(properties);
+        streamingSourceConfig.setParserInfo(parserInfo);
+
+        streamingSourceConfig.updateRandomUuid();
+        streamingSourceConfig.setLastModified(System.currentTimeMillis());
+
+        return streamingSourceConfig;
+    }
+
+}
diff --git a/tool/src/main/resources/SCSinkTools.json 
b/tool/src/main/resources/SCSinkTools.json
index 2872117..101ce0c 100644
--- a/tool/src/main/resources/SCSinkTools.json
+++ b/tool/src/main/resources/SCSinkTools.json
@@ -1,15 +1,24 @@
 [
-  [
-    "org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool",
-    {
-      "storage_type": 2,
-      "cube_desc_override_properties": [
-        "java.util.HashMap",
-        {
-          "kylin.cube.algorithm": "INMEM",
-          "kylin.cube.max-building-segments": "1"
-        }
-      ]
+  {
+    "sink": "hive",
+    "storage_type": 2,
+    "cube_desc_override_properties": {
+      "kylin.cube.algorithm": "INMEM",
+      "kylin.cube.max-building-segments": "1"
     }
-  ]
+  },
+  {
+    "sink": "kafka",
+    "storage_type": 3,
+    "cube_desc_override_properties": {
+      "kylin.cube.algorithm": "INMEM",
+      "kylin.stream.cube.window": 28800,
+      "kylin.stream.cube.duration": 3600,
+      "kylin.stream.segment.retention.policy": "purge",
+      "kylin.cube.max-building-segments": "20"
+    },
+    "table_properties": {
+      "bootstrap.servers": "localhost:9092"
+    }
+  }
 ]
\ No newline at end of file
diff --git 
a/tool/src/test/java/org/apache/kylin/tool/metrics/systemcube/SCCreatorTest.java
 
b/tool/src/test/java/org/apache/kylin/tool/metrics/systemcube/SCCreatorTest.java
index ee553ab..235b4f1 100644
--- 
a/tool/src/test/java/org/apache/kylin/tool/metrics/systemcube/SCCreatorTest.java
+++ 
b/tool/src/test/java/org/apache/kylin/tool/metrics/systemcube/SCCreatorTest.java
@@ -14,39 +14,33 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.tool.metrics.systemcube;
 
 import static org.junit.Assert.assertEquals;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
 import java.io.File;
-import java.io.FileInputStream;
 import java.io.FileOutputStream;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.io.FileUtils;
 import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.JsonUtil;
 import org.apache.kylin.common.util.LocalFileMetadataTestCase;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.metadata.realization.RealizationStatusEnum;
-import org.apache.kylin.metrics.lib.SinkTool;
-import org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool;
+import org.apache.kylin.tool.metrics.systemcube.def.MetricsSinkDesc;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
 import org.junit.Test;
 
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
-
 public class SCCreatorTest extends LocalFileMetadataTestCase {
 
     private File tempMetadataDir;
@@ -84,7 +78,7 @@ public class SCCreatorTest extends LocalFileMetadataTestCase {
         CubeManager cubeManager = CubeManager.getInstance(local);
         List<CubeInstance> cubeList = cubeManager.listAllCubes();
         System.out.println("System cubes: " + cubeList);
-        assertEquals(cubeList.size(), 5);
+        assertEquals(cubeList.size(), 10);
 
         for (CubeInstance cube : cubeList) {
             Assert.assertTrue(cube.getStatus() != 
RealizationStatusEnum.DESCBROKEN);
@@ -96,18 +90,15 @@ public class SCCreatorTest extends 
LocalFileMetadataTestCase {
         Map<String, String> cubeDescOverrideProperties = Maps.newHashMap();
         cubeDescOverrideProperties.put("kylin.cube.algorithm", "INMEM");
 
-        HiveSinkTool hiveSinkTool = new HiveSinkTool();
-        hiveSinkTool.setCubeDescOverrideProperties(cubeDescOverrideProperties);
+        MetricsSinkDesc metricsSinkDesc = new MetricsSinkDesc();
+        
metricsSinkDesc.setCubeDescOverrideProperties(cubeDescOverrideProperties);
+        List<MetricsSinkDesc> metricsSinkDescList = Lists.newArrayList();
 
         String outputPath = "src/test/resources/SCSinkTools.json";
-        try (BufferedOutputStream os = new BufferedOutputStream(new 
FileOutputStream(outputPath))) {
-            ObjectMapper mapper = new ObjectMapper();
-            mapper.enableDefaultTyping();
-            mapper.writeValue(os, Sets.newHashSet(hiveSinkTool));
-        }
+        JsonUtil.writeValue(new FileOutputStream(outputPath), 
metricsSinkDescList);
 
-        Set<SinkTool> sinkToolSet = readSinkToolsJson(outputPath);
-        for (SinkTool entry : sinkToolSet) {
+        List<MetricsSinkDesc> sinkToolSet = readSinkToolsJson(outputPath);
+        for (MetricsSinkDesc entry : sinkToolSet) {
             Map<String, String> props = entry.getCubeDescOverrideProperties();
             for (String key : cubeDescOverrideProperties.keySet()) {
                 assertEquals(props.get(key), 
cubeDescOverrideProperties.get(key));
@@ -117,18 +108,17 @@ public class SCCreatorTest extends 
LocalFileMetadataTestCase {
 
     @Test
     public void testReadSinkToolsJson() throws Exception {
-        Set<SinkTool> sinkToolSet = 
readSinkToolsJson("src/main/resources/SCSinkTools.json");
-        for (SinkTool entry : sinkToolSet) {
+        List<MetricsSinkDesc> sinkToolSet = 
readSinkToolsJson("src/main/resources/SCSinkTools.json");
+        for (MetricsSinkDesc entry : sinkToolSet) {
             Map<String, String> props = entry.getCubeDescOverrideProperties();
             assertEquals(props.get("kylin.cube.algorithm"), "INMEM");
         }
     }
 
-    private Set<SinkTool> readSinkToolsJson(String jsonPath) throws Exception {
-        try (BufferedInputStream is = new BufferedInputStream(new 
FileInputStream(jsonPath))) {
-            ObjectMapper mapper = new ObjectMapper();
-            mapper.enableDefaultTyping();
-            return mapper.readValue(is, HashSet.class);
-        }
+    private List<MetricsSinkDesc> readSinkToolsJson(String jsonPath) throws 
Exception {
+        TypeReference<List<MetricsSinkDesc>> typeRef = new 
TypeReference<List<MetricsSinkDesc>>() {
+        };
+        List<MetricsSinkDesc> sourceToolSet = 
JsonUtil.readValue(FileUtils.readFileToString(new File(jsonPath)), typeRef);
+        return sourceToolSet;
     }
 }
\ No newline at end of file

Reply via email to