This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 0081e58 KYLIN-4371 Integrate System Cube with Real-time OLAP (#1102) 0081e58 is described below commit 0081e5844fa8badc7588ac2ffb2875e047608516 Author: Xiaoxiang Yu <hit_la...@126.com> AuthorDate: Wed Feb 26 22:49:04 2020 +0800 KYLIN-4371 Integrate System Cube with Real-time OLAP (#1102) * Refine metrics system * KYLIN-4371 Update System Cube Metadata Creator --- .../org/apache/kylin/common/KylinConfigBase.java | 5 + .../apache/kylin/job/metrics/JobMetricsFacade.java | 9 +- .../org/apache/kylin/metrics/MetricsManager.java | 2 +- .../apache/kylin/metrics/lib/ActiveReservoir.java | 15 ++- ...ilter.java => ActiveReservoirRecordFilter.java} | 21 +++- .../kylin/metrics/lib/ActiveReservoirReporter.java | 4 +- .../java/org/apache/kylin/metrics/lib/Record.java | 6 +- .../java/org/apache/kylin/metrics/lib/Sink.java | 2 +- .../org/apache/kylin/metrics/lib/SinkTool.java | 32 ----- .../metrics/lib/impl/BaseScheduledReporter.java | 3 +- .../kylin/metrics/lib/impl/InstantReservoir.java | 8 +- .../kylin/metrics/lib/impl/MetricsSystem.java | 8 +- .../apache/kylin/metrics/lib/impl/RecordEvent.java | 57 +++++---- .../metrics/lib/impl/RecordEventTimeDetail.java | 5 +- .../kylin/metrics/lib/impl/ReporterBuilder.java | 8 +- .../kylin/metrics/lib/impl/TimePropertyEnum.java | 11 +- .../kylin/metrics/property/JobPropertyEnum.java | 1 + .../kylin/metrics/property/QueryPropertyEnum.java | 1 + .../kylin/metrics/lib/impl/hive/HiveProducer.java | 2 +- .../lib/impl/hive/HiveReservoirReporter.java | 6 +- .../impl/kafka/KafkaActiveReserviorListener.java | 33 ++--- .../lib/impl/kafka/KafkaReservoirReporter.java | 4 +- .../kylin/rest/metrics/QueryMetricsFacade.java | 12 +- .../tool/metrics/systemcube/CubeDescCreator.java | 42 +++---- .../metrics/systemcube/CubeInstanceCreator.java | 26 ++-- .../tool/metrics/systemcube/HiveTableCreator.java | 2 +- .../tool/metrics/systemcube/KylinTableCreator.java | 34 +++-- .../tool/metrics/systemcube/ModelCreator.java | 26 ++-- .../kylin/tool/metrics/systemcube/SCCreator.java | 139 +++++++++++++-------- .../HiveSinkTool.java => def/MetricsSinkDesc.java} | 46 ++++++- .../systemcube/streamingv2/KafkaTopicCreator.java | 46 +++++++ .../streamingv2/StreamingMetadataCreator.java | 100 +++++++++++++++ tool/src/main/resources/SCSinkTools.json | 33 +++-- .../tool/metrics/systemcube/SCCreatorTest.java | 50 +++----- 34 files changed, 505 insertions(+), 294 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 7d5cda8..09f7c8a 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -2208,6 +2208,11 @@ public abstract class KylinConfigBase implements Serializable { return getPropertiesByPrefix("kylin.metrics."); } + public int printSampleEventRatio(){ + String val = getOptional("kylin.metrics.kafka-sample-ratio", "10000"); + return Integer.parseInt(val); + } + // ============================================================================ // tool // ============================================================================ diff --git a/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java b/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java index e9d41bf..5d5c607 100644 --- a/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java +++ b/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java @@ -41,13 +41,13 @@ public class JobMetricsFacade { setJobWrapper(metricsEvent, jobStats.user, jobStats.projectName, jobStats.cubeName, jobStats.jobId, jobStats.jobType, jobStats.cubingType); setJobStats(metricsEvent, jobStats.tableSize, jobStats.cubeSize, jobStats.buildDuration, - jobStats.waitResourceTime, jobStats.perBytesTimeCost, // + jobStats.waitResourceTime, jobStats.perBytesTimeCost, jobStats.dColumnDistinct, jobStats.dDictBuilding, jobStats.dCubingInmem, jobStats.dHfileConvert); } else { metricsEvent = new TimedRecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJobException()); setJobExceptionWrapper(metricsEvent, jobStats.user, jobStats.projectName, jobStats.cubeName, jobStats.jobId, jobStats.jobType, jobStats.cubingType, // - jobStats.throwable.getClass()); + jobStats.throwable); } MetricsManager.getInstance().update(metricsEvent); } @@ -78,9 +78,10 @@ public class JobMetricsFacade { private static <T extends Throwable> void setJobExceptionWrapper(RecordEvent metricsEvent, String user, String projectName, String cubeName, String jobId, String jobType, String cubingType, - Class<T> throwableClass) { + Throwable throwable) { setJobWrapper(metricsEvent, user, projectName, cubeName, jobId, jobType, cubingType); - metricsEvent.put(JobPropertyEnum.EXCEPTION.toString(), throwableClass.getName()); + metricsEvent.put(JobPropertyEnum.EXCEPTION.toString(), throwable.getClass().getName()); + metricsEvent.put(JobPropertyEnum.EXCEPTION_MSG.toString(), throwable.getMessage()); } public static class JobStatisticsResult { diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java b/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java index a5f1088..c0fce0b 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java @@ -131,7 +131,7 @@ public class MetricsManager { .start(); } catch (Exception e) { logger.warn("Cannot initialize ActiveReservoirReporter: Builder class - " + subEntry.getFirst() - + ", Properties - " + subEntry.getSecond()); + + ", Properties - " + subEntry.getSecond(), e); } } Metrics.register(registerName, activeReservoir); diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java index 542a781..1a3c8db 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java @@ -21,24 +21,24 @@ package org.apache.kylin.metrics.lib; import java.io.Closeable; /** - * Reservoir for mertics message, a Reservoir(something like cache)'s duty is store mertics message temporarily - * and emit messages to external Sink by notifying specific ActiveReservoirListener. + * Reservoir for metrics event, a Reservoir(something like cache/buffer)'s duty is store metrics event temporarily + * and emit events to external Sink by notifying specific ActiveReservoirListener. */ public interface ActiveReservoir extends Closeable { /** - * @return how many mertics message was currently cached(not emit) + * @return how many metrics message was currently cached(not emit) */ int size(); /** * stage metrics message into Reservoir, but whether to emit it to external storage - * immediately is decided by specific implemention + * immediately is decided by specific implementation */ void update(Record record); /** - * add listener which responsed to message update + * add listener which is in charge of send metrics event update */ void addListener(ActiveReservoirListener listener); @@ -46,10 +46,13 @@ public interface ActiveReservoir extends Closeable { void removeAllListener(); + /** + * A backup listener, it will be called when one of the previous listener failed. + */ void setHAListener(ActiveReservoirListener listener); /** - * do some prepare to accept metrics message + * do some prepare to accept metrics event */ void start(); diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirFilter.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirRecordFilter.java similarity index 72% rename from core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirFilter.java rename to core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirRecordFilter.java index 5cffcfc..5b731e5 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirFilter.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirRecordFilter.java @@ -14,19 +14,19 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.metrics.lib; /** * A filter used to determine whether or not an active reservoir should be reported, among other things. */ -public interface ActiveReservoirFilter { +public interface ActiveReservoirRecordFilter { /** * Matches all active reservoirs, regardless of type or name. */ - ActiveReservoirFilter ALL = new ActiveReservoirFilter() { + ActiveReservoirRecordFilter ALL = new ActiveReservoirRecordFilter() { @Override public boolean matches(String name, ActiveReservoir activeReservoir) { return true; @@ -36,9 +36,18 @@ public interface ActiveReservoirFilter { /** * Returns {@code true} if the active reservoir matches the filter; {@code false} otherwise. * - * @param name the active reservoir's name - * @param activeReservoir the active reservoir + * @param name the active reservoir's name + * @param activeReservoir the active reservoir * @return {@code true} if the active reservoir matches the filter */ - boolean matches(String name, ActiveReservoir activeReservoir); + default boolean matches(String name, ActiveReservoir activeReservoir) { + return true; + } + + /** + * This method is used to check and whether to filter a Record on listener side + */ + default boolean checkRecord(Record record) { + return true; + } } diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java index fa807e8..9b13018 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java @@ -14,7 +14,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.metrics.lib; @@ -27,7 +27,7 @@ import org.apache.kylin.common.util.Pair; import com.google.common.base.Strings; /** - * ActiveReservoirReporter report metrics message from ActiveReservoir + * ActiveReservoirReporter report metrics event via listener from ActiveReservoir */ public abstract class ActiveReservoirReporter implements Closeable { diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java index 112951ad..0ce6d74 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java @@ -21,14 +21,14 @@ package org.apache.kylin.metrics.lib; import java.util.Map; /** - * Metrics message + * Metrics event in Metrics System */ public interface Record { /** - * For classification + * For classification, will mapping to Hive Table or Kafka Topic */ - String getType(); + String getSubject(); /** * For keep ordering in the same category diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java index a04356d..681c131 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java @@ -19,7 +19,7 @@ package org.apache.kylin.metrics.lib; /** - * Sink is where mertics data will write to + * Sink is where metrics event will write to */ public interface Sink { String getTableFromSubject(String subject); diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/SinkTool.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/SinkTool.java deleted file mode 100644 index b55516a..0000000 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/SinkTool.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.metrics.lib; - -import java.io.Serializable; -import java.util.Map; - -public interface SinkTool extends Serializable { - int getStorageType(); - - int getSourceType(); - - String getTableNameForMetrics(String subject); - - Map<String, String> getCubeDescOverrideProperties(); -} diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java index c006352..735a69c 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java @@ -29,9 +29,8 @@ import org.slf4j.LoggerFactory; import com.google.common.util.concurrent.ThreadFactoryBuilder; /** - * @deprecated abandon code which seems to be replaced by ActiveReservoirReporter, should be removed later + * Extension Point for use-defined, fix-rated Metrics Reporter */ -@Deprecated public abstract class BaseScheduledReporter implements Closeable { private static final Logger logger = LoggerFactory.getLogger(BaseScheduledReporter.class); diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java index 6bd9539..78de933 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java @@ -29,7 +29,7 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; /** - * A Reservoir which don't staged metrics message at all, emit them in no time. + * A Reservoir which don't staged metrics event at all, emit them in no time. */ public class InstantReservoir extends AbstractActiveReservoir { @@ -52,8 +52,7 @@ public class InstantReservoir extends AbstractActiveReservoir { for (ActiveReservoirListener listener : listeners) { if (!notifyListenerOfUpdatedRecord(listener, record)) { ifSucceed = false; - logger.warn( - "It fails to notify listener " + listener.toString() + " of updated record " + Arrays.toString(record.getKey())); + logger.info("Fails to notify {} of record {}", listener, Arrays.toString(record.getKey())); } } if (!ifSucceed) { @@ -68,8 +67,7 @@ public class InstantReservoir extends AbstractActiveReservoir { } private boolean notifyListenerHAOfUpdatedRecord(Record record) { - logger.info("The HA listener " + listenerHA.toString() + " for updated record " + Arrays.toString(record.getKey()) - + " will be started"); + logger.info("Use HA Listener {} to notify record {}", listenerHA, Arrays.toString(record.getKey())); if (!notifyListenerOfUpdatedRecord(listenerHA, record)) { logger.error("The HA listener also fails!!!"); return false; diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java index feae5c5..1e46bce 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/MetricsSystem.java @@ -27,7 +27,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.metrics.lib.ActiveReservoir; -import org.apache.kylin.metrics.lib.ActiveReservoirFilter; +import org.apache.kylin.metrics.lib.ActiveReservoirRecordFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -98,7 +98,7 @@ public class MetricsSystem extends MetricRegistry { * * @param filter a filter */ - public void removeActiveReservoirMatching(ActiveReservoirFilter filter) { + public void removeActiveReservoirMatching(ActiveReservoirRecordFilter filter) { for (Map.Entry<String, ActiveReservoir> entry : activeReservoirs.entrySet()) { if (filter.matches(entry.getKey(), entry.getValue())) { removeActiveReservoir(entry.getKey()); @@ -123,7 +123,7 @@ public class MetricsSystem extends MetricRegistry { * @return all the active reservoirs in the metrics system */ public SortedMap<String, ActiveReservoir> getActiveReservoirs() { - return getActiveReservoirs(ActiveReservoirFilter.ALL); + return getActiveReservoirs(ActiveReservoirRecordFilter.ALL); } /** @@ -132,7 +132,7 @@ public class MetricsSystem extends MetricRegistry { * @param filter the active reservoir filter to match * @return all the active reservoirs in the metrics system */ - public SortedMap<String, ActiveReservoir> getActiveReservoirs(ActiveReservoirFilter filter) { + public SortedMap<String, ActiveReservoir> getActiveReservoirs(ActiveReservoirRecordFilter filter) { final TreeMap<String, ActiveReservoir> reservoirs = new TreeMap<>(); for (Map.Entry<String, ActiveReservoir> entry : activeReservoirs.entrySet()) { if (filter.matches(entry.getKey(), entry.getValue())) { diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java index 5118fc9..7f1d02d 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEvent.java @@ -14,7 +14,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.metrics.lib.impl; @@ -28,24 +28,32 @@ import java.util.Collection; import java.util.Map; import java.util.Set; -import org.apache.kylin.common.threadlocal.InternalThreadLocal; import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.metrics.lib.Record; import com.google.common.collect.Maps; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class RecordEvent implements Record, Map<String, Object>, Serializable { - private static final InternalThreadLocal<ByteArrayOutputStream> _localBaos = new InternalThreadLocal<ByteArrayOutputStream>(); + private static final Logger logger = LoggerFactory.getLogger(RecordEvent.class); + + private static final ThreadLocal<ByteArrayOutputStream> _localBaos = new ThreadLocal<>(); + + public static final String LOCAL_HOSTNAME; - static String localHostname; static { + String localHostname1; try { InetAddress addr = InetAddress.getLocalHost(); - localHostname = addr.getHostName() + ":" + addr.getHostAddress(); + localHostname1 = addr.getHostName() + ":" + addr.getHostAddress(); + logger.info("RecordEvent using hostname : {}.", localHostname1); } catch (UnknownHostException e) { - localHostname = "Unknown"; + logger.info("Unexpected ", e); + localHostname1 = "Unknown"; } + LOCAL_HOSTNAME = localHostname1; } private final Map<String, Object> backingMap; @@ -55,11 +63,7 @@ public class RecordEvent implements Record, Map<String, Object>, Serializable { } public RecordEvent(String eventType) { - this(eventType, localHostname); - } - - public RecordEvent(String eventType, long time) { - this(eventType, localHostname, time); + this(eventType, LOCAL_HOSTNAME); } public RecordEvent(String eventType, String host) { @@ -71,28 +75,27 @@ public class RecordEvent implements Record, Map<String, Object>, Serializable { } /** - * * @param map - * @param eventType mandatory with null check - * @param host mandatory without null check - * @param time mandatory with null check + * @param eventType mandatory with null check + * @param host mandatory without null check + * @param time mandatory with null check */ public RecordEvent(Map<String, Object> map, String eventType, String host, long time) { - backingMap = map != null ? map : Maps.<String, Object> newHashMap(); + backingMap = map != null ? map : Maps.<String, Object>newHashMap(); setEventType(eventType); setHost(host); setTime(time); } public String getEventType() { - return (String) get(RecordReserveKeyEnum.TYPE.toString()); + return (String) get(RecordReserveKeyEnum.EVENT_SUBJECT.toString()); } private void setEventType(String eventType) { if (eventType == null) { throw new IllegalArgumentException("EventType cannot be null."); } - put(RecordReserveKeyEnum.TYPE.toString(), eventType); + put(RecordReserveKeyEnum.EVENT_SUBJECT.toString(), eventType); } public String getHost() { @@ -202,7 +205,7 @@ public class RecordEvent implements Record, Map<String, Object>, Serializable { } @Override - public String getType() { + public String getSubject() { return getEventType(); } @@ -211,20 +214,20 @@ public class RecordEvent implements Record, Map<String, Object>, Serializable { return (getHost() + "-" + getTime() + "-" + getID()).getBytes(StandardCharsets.UTF_8); } - @Override /** * Event type and time does not belong to value part */ + @Override public Map<String, Object> getValueRaw() { Map<String, Object> cloneMap = Maps.newHashMap(backingMap); - cloneMap.remove(RecordReserveKeyEnum.TYPE.toString()); + cloneMap.remove(RecordReserveKeyEnum.EVENT_SUBJECT.toString()); return cloneMap; } - @Override /** * Event type does not belong to value part, it's for classification */ + @Override public byte[] getValue() { try { ByteArrayOutputStream baos = _localBaos.get(); @@ -248,11 +251,15 @@ public class RecordEvent implements Record, Map<String, Object>, Serializable { } public enum RecordReserveKeyEnum { - TYPE("EVENT_TYPE"), ID("EVENT_ID"), HOST("HOST"), TIME("KTIMESTAMP"); + + EVENT_SUBJECT("EVENT_TYPE") + , ID("EVENT_ID") // Not used currently + , HOST("HOST") + , TIME("KTIMESTAMP"); private final String reserveKey; - private RecordReserveKeyEnum(String key) { + RecordReserveKeyEnum(String key) { this.reserveKey = key; } @@ -263,7 +270,7 @@ public class RecordEvent implements Record, Map<String, Object>, Serializable { public RecordReserveKeyEnum getByKey(String key) { for (RecordReserveKeyEnum reserveKey : RecordReserveKeyEnum.values()) { - if (reserveKey.reserveKey.equals(key)) { + if (reserveKey.reserveKey.equalsIgnoreCase(key)) { return reserveKey; } } diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java index aa1d307..4c549bc 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/RecordEventTimeDetail.java @@ -24,12 +24,11 @@ import java.util.Locale; import java.util.TimeZone; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.threadlocal.InternalThreadLocal; public class RecordEventTimeDetail { private static final TimeZone timeZone; - private static final InternalThreadLocal<SimpleDateFormat> dateFormatThreadLocal = new InternalThreadLocal<SimpleDateFormat>(); - private static final InternalThreadLocal<SimpleDateFormat> timeFormatThreadLocal = new InternalThreadLocal<SimpleDateFormat>(); + private static final ThreadLocal<SimpleDateFormat> dateFormatThreadLocal = new ThreadLocal<>(); + private static final ThreadLocal<SimpleDateFormat> timeFormatThreadLocal = new ThreadLocal<>(); static { timeZone = TimeZone.getTimeZone(KylinConfig.getInstanceFromEnv().getTimeZone()); diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java index 22fadd3..0b6ef35 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/ReporterBuilder.java @@ -14,7 +14,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.metrics.lib.impl; @@ -23,9 +23,15 @@ import java.util.Properties; import org.apache.kylin.metrics.lib.ActiveReservoir; import org.apache.kylin.metrics.lib.ActiveReservoirReporter; +/** + * Builder of ActiveReservoirReporter + */ public abstract class ReporterBuilder { protected final ActiveReservoir registry; protected final Properties props; + public static final String LISTENER_FILTER_CLASS = "listener.filter.class"; + public static final String CALLBACK_URL = "callback.url"; + public static final String LISTENER_FILTER_DEFAULT_CLASS = "org.apache.kylin.metrics.lib.impl.callback.CallbackActiveReservoirFilter"; protected ReporterBuilder(ActiveReservoir activeReservoir) { this.registry = activeReservoir; diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java index c013b4c..b6e700e 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/TimePropertyEnum.java @@ -23,9 +23,14 @@ import java.util.Locale; import com.google.common.base.Strings; public enum TimePropertyEnum { - YEAR("KYEAR_BEGIN_DATE"), MONTH("KMONTH_BEGIN_DATE"), WEEK_BEGIN_DATE("KWEEK_BEGIN_DATE"), DAY_DATE( - "KDAY_DATE"), DAY_TIME( - "KDAY_TIME"), TIME_HOUR("KTIME_HOUR"), TIME_MINUTE("KTIME_MINUTE"), TIME_SECOND("KTIME_SECOND"); + YEAR("KYEAR_BEGIN_DATE"), + MONTH("KMONTH_BEGIN_DATE"), + WEEK_BEGIN_DATE("KWEEK_BEGIN_DATE"), + DAY_DATE("KDAY_DATE"), + DAY_TIME("KDAY_TIME"), + TIME_HOUR("KTIME_HOUR"), + TIME_MINUTE("KTIME_MINUTE"), + TIME_SECOND("KTIME_SECOND"); private final String propertyName; diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/property/JobPropertyEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/property/JobPropertyEnum.java index e55cf2b..cb36599 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/property/JobPropertyEnum.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/property/JobPropertyEnum.java @@ -33,6 +33,7 @@ public enum JobPropertyEnum { ALGORITHM("CUBING_TYPE"), STATUS("JOB_STATUS"), EXCEPTION("EXCEPTION"), + EXCEPTION_MSG("EXCEPTION_MSG"), SOURCE_SIZE("TABLE_SIZE"), CUBE_SIZE("CUBE_SIZE"), diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryPropertyEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryPropertyEnum.java index e9c51df..c83c669 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryPropertyEnum.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryPropertyEnum.java @@ -28,6 +28,7 @@ import com.google.common.base.Strings; public enum QueryPropertyEnum { ID_CODE("QUERY_HASH_CODE"), + SQL("QUERY_SQL"), TYPE("QUERY_TYPE"), USER("KUSER"), PROJECT("PROJECT"), diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java index 5ab6c9f..c2ec1ec 100644 --- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java +++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java @@ -257,7 +257,7 @@ public class HiveProducer { partitionKVs.put(TimePropertyEnum.DAY_DATE.toString(), rawValue.get(TimePropertyEnum.DAY_DATE.toString()).toString()); - return parseToHiveProducerRecord(HiveReservoirReporter.getTableFromSubject(record.getType()), partitionKVs, + return parseToHiveProducerRecord(HiveReservoirReporter.getTableFromSubject(record.getSubject()), partitionKVs, rawValue); } diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java index 8e8f3b6..9d93e99 100644 --- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java +++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java @@ -132,10 +132,10 @@ public class HiveReservoirReporter extends ActiveReservoirReporter { try { Map<String, List<Record>> queues = new HashMap<>(); for (Record record : records) { - List<Record> recordQueues = queues.get(record.getType()); + List<Record> recordQueues = queues.get(record.getSubject()); if (recordQueues == null) { recordQueues = new ArrayList<>(); - queues.put(record.getType(), recordQueues); + queues.put(record.getSubject(), recordQueues); } recordQueues.add(record); } @@ -153,7 +153,7 @@ public class HiveReservoirReporter extends ActiveReservoirReporter { public boolean onRecordUpdate(final Record record) { try { - HiveProducer producer = getProducer(record.getType()); + HiveProducer producer = getProducer(record.getSubject()); producer.send(record); } catch (Exception e) { logger.error(e.getMessage(), e); diff --git a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java index 311f3e3..df79c57 100644 --- a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java +++ b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaActiveReserviorListener.java @@ -23,6 +23,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.kafka.clients.producer.Callback; import org.apache.kafka.clients.producer.RecordMetadata; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.metrics.lib.ActiveReservoirListener; import org.apache.kylin.metrics.lib.Record; import org.slf4j.Logger; @@ -33,19 +34,18 @@ public abstract class KafkaActiveReserviorListener implements ActiveReservoirLis public static final long TOPIC_AVAILABLE_TAG = 0L; protected static final Logger logger = LoggerFactory.getLogger(KafkaActiveReserviorListener.class); protected Long maxBlockMs = 1800000L; - protected int maxRecordForLogNum = 10000; + protected int maxRecordForLogNum = KylinConfig.getInstanceFromEnv().printSampleEventRatio(); protected int maxRecordSkipForLogNum = 10000; protected ConcurrentHashMap<String, Long> topicsIfAvailable = new ConcurrentHashMap<>(); - private int nRecord = 0; - private int nRecordSkip = 0; - private Callback produceCallback = new Callback() { - @Override - public void onCompletion(RecordMetadata metadata, Exception exception) { - if (exception != null) { - exception.printStackTrace(); - return; - } - logger.info("topic:" + metadata.topic() + "; partition: " + metadata.partition() + "; offset: " + metadata.offset()); + private long nRecord = 0; + private long nRecordSkip = 0; + private int threshold = Integer.min((int)(maxRecordForLogNum * 0.002), 25); + + private Callback produceCallback = (RecordMetadata metadata, Exception exception) -> { + if(exception != null){ + logger.warn("Unexpected exception.", exception); + } else { + logger.debug("Topic:{} ; partition:{} ; offset:{} .", metadata.topic(), metadata.partition(), metadata.offset()); } }; @@ -67,17 +67,19 @@ public abstract class KafkaActiveReserviorListener implements ActiveReservoirLis public boolean onRecordUpdate(final List<Record> records) { try { for (Record record : records) { - String topic = decorateTopic(record.getType()); + String topic = decorateTopic(record.getSubject()); + if (nRecord <= threshold) { + logger.debug("Send record {} to topic : {}", record, topic); + } if (!checkAvailable(topic)) { if (nRecordSkip % maxRecordSkipForLogNum == 0) { nRecordSkip = 0; - logger.warn("Skip to send record to topic " + topic); + logger.warn("Skip to send record to topic {}", topic); } nRecordSkip++; continue; } if (nRecord % maxRecordForLogNum == 0) { - nRecord = 0; sendWrapper(topic, record, produceCallback); } else { sendWrapper(topic, record, null); @@ -101,7 +103,7 @@ public abstract class KafkaActiveReserviorListener implements ActiveReservoirLis topicsIfAvailable.put(topic, TOPIC_AVAILABLE_TAG); return true; } catch (org.apache.kafka.common.errors.TimeoutException e) { - logger.warn("Fail to fetch metadata for topic " + topic); + logger.warn("Fail to fetch metadata for topic " + topic, e); setUnAvailable(topic); return false; } @@ -110,6 +112,7 @@ public abstract class KafkaActiveReserviorListener implements ActiveReservoirLis } protected void setUnAvailable(String topic) { + logger.debug("Cannot find topic {}", topic); topicsIfAvailable.put(topic, System.currentTimeMillis()); } } diff --git a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java index a5ea3aa..a7b58a6 100644 --- a/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java +++ b/metrics-reporter-kafka/src/main/java/org/apache/kylin/metrics/lib/impl/kafka/KafkaReservoirReporter.java @@ -14,7 +14,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.metrics.lib.impl.kafka; @@ -58,7 +58,7 @@ public class KafkaReservoirReporter extends ActiveReservoirReporter { return new Builder(activeReservoir); } - private static String decorateTopic(String topic) { + public static String decorateTopic(String topic) { return ActiveReservoirReporter.KYLIN_PREFIX + "_" + KAFKA_REPORTER_SUFFIX + "_" + topic; } diff --git a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java index 226166d..11f5f6e 100644 --- a/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java +++ b/server-base/src/main/java/org/apache/kylin/rest/metrics/QueryMetricsFacade.java @@ -56,7 +56,7 @@ public class QueryMetricsFacade { private static final HashFunction hashFunc = Hashing.murmur3_128(); private static boolean enabled = false; - private static ConcurrentHashMap<String, QueryMetrics> metricsMap = new ConcurrentHashMap<String, QueryMetrics>(); + private static ConcurrentHashMap<String, QueryMetrics> metricsMap = new ConcurrentHashMap<>(); public static void init() { enabled = KylinConfig.getInstanceFromEnv().getQueryMetricsEnabled(); @@ -66,7 +66,7 @@ public class QueryMetricsFacade { DefaultMetricsSystem.initialize("Kylin"); } - public static long getSqlHashCode(String sql) { + private static long getSqlHashCode(String sql) { return hashFunc.hashString(sql, Charset.forName("UTF-8")).asLong(); } @@ -112,12 +112,11 @@ public class QueryMetricsFacade { //For update rpc level related metrics MetricsManager.getInstance().update(rpcMetricsEvent); } - long sqlHashCode = getSqlHashCode(sqlRequest.getSql()); for (QueryContext.CubeSegmentStatisticsResult contextEntry : sqlResponse.getCubeSegmentStatisticsList()) { RecordEvent queryMetricsEvent = new TimedRecordEvent( KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectQuery()); setQueryWrapper(queryMetricsEvent, // - user, sqlHashCode, sqlResponse.isStorageCacheUsed() ? "CACHE" : contextEntry.getQueryType(), + user, sqlRequest.getSql(), sqlResponse.isStorageCacheUsed() ? "CACHE" : contextEntry.getQueryType(), norm(sqlRequest.getProject()), contextEntry.getRealization(), contextEntry.getRealizationType(), sqlResponse.getThrowable()); @@ -206,10 +205,11 @@ public class QueryMetricsFacade { metricsEvent.put(QueryCubePropertyEnum.WEIGHT_PER_HIT.toString(), weightPerHit); } - private static void setQueryWrapper(RecordEvent metricsEvent, String user, long queryHashCode, String queryType, + private static void setQueryWrapper(RecordEvent metricsEvent, String user, String sql, String queryType, String projectName, String realizationName, int realizationType, Throwable throwable) { metricsEvent.put(QueryPropertyEnum.USER.toString(), user); - metricsEvent.put(QueryPropertyEnum.ID_CODE.toString(), queryHashCode); + metricsEvent.put(QueryPropertyEnum.ID_CODE.toString(), getSqlHashCode(sql)); + metricsEvent.put(QueryPropertyEnum.SQL.toString(), sql); metricsEvent.put(QueryPropertyEnum.TYPE.toString(), queryType); metricsEvent.put(QueryPropertyEnum.PROJECT.toString(), projectName); metricsEvent.put(QueryPropertyEnum.REALIZATION.toString(), realizationName); diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java index f525e44..5f64da0 100644 --- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeDescCreator.java @@ -40,7 +40,6 @@ import org.apache.kylin.metadata.model.FunctionDesc; import org.apache.kylin.metadata.model.IEngineAware; import org.apache.kylin.metadata.model.MeasureDesc; import org.apache.kylin.metadata.model.ParameterDesc; -import org.apache.kylin.metrics.lib.SinkTool; import org.apache.kylin.metrics.lib.impl.RecordEvent; import org.apache.kylin.metrics.lib.impl.TimePropertyEnum; import org.apache.kylin.metrics.property.JobPropertyEnum; @@ -51,11 +50,12 @@ import org.apache.kylin.metrics.property.QueryRPCPropertyEnum; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.kylin.tool.metrics.systemcube.def.MetricsSinkDesc; public class CubeDescCreator { - public static CubeDesc generateKylinCubeDescForMetricsQuery(KylinConfig config, SinkTool sinkTool) { - String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQuery()); + public static CubeDesc generateKylinCubeDescForMetricsQuery(KylinConfig config, MetricsSinkDesc sinkDesc) { + String tableName = sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQuery()); //Set for dimensions List<String> dimensions = ModelCreator.getDimensionsForMetricsQuery(); @@ -131,12 +131,12 @@ public class CubeDescCreator { HBaseMappingDesc hBaseMapping = new HBaseMappingDesc(); hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList)); - return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), dimensionDescList, measureDescList, - rowKeyDesc, aggGroup, hBaseMapping, sinkTool.getCubeDescOverrideProperties()); + return generateKylinCubeDesc(tableName, sinkDesc.getStorageType(), dimensionDescList, measureDescList, + rowKeyDesc, aggGroup, hBaseMapping, sinkDesc.getCubeDescOverrideProperties()); } - public static CubeDesc generateKylinCubeDescForMetricsQueryCube(KylinConfig config, SinkTool sinkTool) { - String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQueryCube()); + public static CubeDesc generateKylinCubeDescForMetricsQueryCube(KylinConfig config, MetricsSinkDesc sinkDesc) { + String tableName = sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryCube()); //Set for dimensions List<String> dimensions = ModelCreator.getDimensionsForMetricsQueryCube(); @@ -218,12 +218,12 @@ public class CubeDescCreator { HBaseMappingDesc hBaseMapping = new HBaseMappingDesc(); hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList)); - return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), dimensionDescList, measureDescList, - rowKeyDesc, aggGroup, hBaseMapping, sinkTool.getCubeDescOverrideProperties()); + return generateKylinCubeDesc(tableName, sinkDesc.getStorageType(), dimensionDescList, measureDescList, + rowKeyDesc, aggGroup, hBaseMapping, sinkDesc.getCubeDescOverrideProperties()); } - public static CubeDesc generateKylinCubeDescForMetricsQueryRPC(KylinConfig config, SinkTool sinkTool) { - String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQueryRpcCall()); + public static CubeDesc generateKylinCubeDescForMetricsQueryRPC(KylinConfig config, MetricsSinkDesc sinkDesc) { + String tableName = sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryRpcCall()); //Set for dimensions List<String> dimensions = ModelCreator.getDimensionsForMetricsQueryRPC(); @@ -288,12 +288,12 @@ public class CubeDescCreator { HBaseMappingDesc hBaseMapping = new HBaseMappingDesc(); hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList)); - return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), dimensionDescList, measureDescList, - rowKeyDesc, aggGroup, hBaseMapping, sinkTool.getCubeDescOverrideProperties()); + return generateKylinCubeDesc(tableName, sinkDesc.getStorageType(), dimensionDescList, measureDescList, + rowKeyDesc, aggGroup, hBaseMapping, sinkDesc.getCubeDescOverrideProperties()); } - public static CubeDesc generateKylinCubeDescForMetricsJob(KylinConfig config, SinkTool sinkTool) { - String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectJob()); + public static CubeDesc generateKylinCubeDescForMetricsJob(KylinConfig config, MetricsSinkDesc sinkDesc) { + String tableName = sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectJob()); //Set for dimensions List<String> dimensions = ModelCreator.getDimensionsForMetricsJob(); @@ -368,12 +368,12 @@ public class CubeDescCreator { HBaseMappingDesc hBaseMapping = new HBaseMappingDesc(); hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList)); - return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), dimensionDescList, measureDescList, - rowKeyDesc, aggGroup, hBaseMapping, sinkTool.getCubeDescOverrideProperties()); + return generateKylinCubeDesc(tableName, sinkDesc.getStorageType(), dimensionDescList, measureDescList, + rowKeyDesc, aggGroup, hBaseMapping, sinkDesc.getCubeDescOverrideProperties()); } - public static CubeDesc generateKylinCubeDescForMetricsJobException(KylinConfig config, SinkTool sinkTool) { - String tableName = sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectJobException()); + public static CubeDesc generateKylinCubeDescForMetricsJobException(KylinConfig config, MetricsSinkDesc sinkDesc) { + String tableName = sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectJobException()); //Set for dimensions List<String> dimensions = ModelCreator.getDimensionsForMetricsJobException(); @@ -432,8 +432,8 @@ public class CubeDescCreator { HBaseMappingDesc hBaseMapping = new HBaseMappingDesc(); hBaseMapping.setColumnFamily(getHBaseColumnFamily(measureDescList)); - return generateKylinCubeDesc(tableName, sinkTool.getStorageType(), dimensionDescList, measureDescList, - rowKeyDesc, aggGroup, hBaseMapping, sinkTool.getCubeDescOverrideProperties()); + return generateKylinCubeDesc(tableName, sinkDesc.getStorageType(), dimensionDescList, measureDescList, + rowKeyDesc, aggGroup, hBaseMapping, sinkDesc.getCubeDescOverrideProperties()); } public static CubeDesc generateKylinCubeDesc(String tableName, int storageType, diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeInstanceCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeInstanceCreator.java index 0fcec3b..7d70bc2 100644 --- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeInstanceCreator.java +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/CubeInstanceCreator.java @@ -27,16 +27,14 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.metadata.model.Segments; import org.apache.kylin.metadata.realization.RealizationStatusEnum; -import org.apache.kylin.metrics.lib.SinkTool; -import org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool; +import org.apache.kylin.tool.metrics.systemcube.def.MetricsSinkDesc; public class CubeInstanceCreator { public static void main(String[] args) throws Exception { - // KylinConfig.setSandboxEnvIfPossible(); KylinConfig config = KylinConfig.getInstanceFromEnv(); - CubeInstance cubeInstance = generateKylinCubeInstanceForMetricsQuery("ADMIN", config, new HiveSinkTool()); + CubeInstance cubeInstance = generateKylinCubeInstanceForMetricsQuery("ADMIN", config, new MetricsSinkDesc()); ByteArrayOutputStream buf = new ByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(buf); CubeManager.CUBE_SERIALIZER.serialize(cubeInstance, dout); @@ -46,31 +44,31 @@ public class CubeInstanceCreator { } public static CubeInstance generateKylinCubeInstanceForMetricsQuery(String owner, KylinConfig config, - SinkTool sinkTool) { - return generateKylinCubeInstance(owner, sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQuery())); + MetricsSinkDesc sinkDesc) { + return generateKylinCubeInstance(owner, sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQuery())); } public static CubeInstance generateKylinCubeInstanceForMetricsQueryCube(String owner, KylinConfig config, - SinkTool sinkTool) { + MetricsSinkDesc sinkDesc) { return generateKylinCubeInstance(owner, - sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQueryCube())); + sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryCube())); } public static CubeInstance generateKylinCubeInstanceForMetricsQueryRPC(String owner, KylinConfig config, - SinkTool sinkTool) { + MetricsSinkDesc sinkDesc) { return generateKylinCubeInstance(owner, - sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectQueryRpcCall())); + sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectQueryRpcCall())); } public static CubeInstance generateKylinCubeInstanceForMetricsJob(String owner, KylinConfig config, - SinkTool sinkTool) { - return generateKylinCubeInstance(owner, sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectJob())); + MetricsSinkDesc sinkDesc) { + return generateKylinCubeInstance(owner, sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectJob())); } public static CubeInstance generateKylinCubeInstanceForMetricsJobException(String owner, KylinConfig config, - SinkTool sinkTool) { + MetricsSinkDesc sinkDesc) { return generateKylinCubeInstance(owner, - sinkTool.getTableNameForMetrics(config.getKylinMetricsSubjectJobException())); + sinkDesc.getTableNameForMetrics(config.getKylinMetricsSubjectJobException())); } public static CubeInstance generateKylinCubeInstance(String owner, String tableName) { diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java index d07cd08..f6e4cfe 100644 --- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/HiveTableCreator.java @@ -39,7 +39,6 @@ import com.google.common.collect.Lists; public class HiveTableCreator { public static void main(String[] args) { - // KylinConfig.setSandboxEnvIfPossible(); KylinConfig config = KylinConfig.getInstanceFromEnv(); System.out.println(generateAllSQL(config)); @@ -272,6 +271,7 @@ public class HiveTableCreator { return null; } + @Override public String toString() { return typeName; } diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java index c84df4a..76fa7bd 100644 --- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/KylinTableCreator.java @@ -30,18 +30,16 @@ import org.apache.kylin.metadata.model.ColumnDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metrics.MetricsManager; import org.apache.kylin.metrics.lib.ActiveReservoirReporter; -import org.apache.kylin.metrics.lib.SinkTool; -import org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool; +import org.apache.kylin.tool.metrics.systemcube.def.MetricsSinkDesc; import com.google.common.collect.Lists; public class KylinTableCreator { public static void main(String[] args) throws Exception { - // KylinConfig.setSandboxEnvIfPossible(); KylinConfig config = KylinConfig.getInstanceFromEnv(); - TableDesc kylinTable = generateKylinTableForMetricsQuery(config, new HiveSinkTool()); + TableDesc kylinTable = generateKylinTableForMetricsQuery(config, new MetricsSinkDesc()); ByteArrayOutputStream buf = new ByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(buf); TableMetadataManager.TABLE_SERIALIZER.serialize(kylinTable, dout); @@ -50,53 +48,53 @@ public class KylinTableCreator { System.out.println(buf.toString("UTF-8")); } - public static TableDesc generateKylinTableForMetricsQuery(KylinConfig kylinConfig, SinkTool sinkTool) { + public static TableDesc generateKylinTableForMetricsQuery(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) { List<Pair<String, String>> columns = Lists.newLinkedList(); columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQuery()); columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable()); - return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectQuery(), columns); + return generateKylinTable(kylinConfig, sinkDesc, kylinConfig.getKylinMetricsSubjectQuery(), columns); } - public static TableDesc generateKylinTableForMetricsQueryCube(KylinConfig kylinConfig, SinkTool sinkTool) { + public static TableDesc generateKylinTableForMetricsQueryCube(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) { List<Pair<String, String>> columns = Lists.newLinkedList(); columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryCube()); columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable()); - return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectQueryCube(), columns); + return generateKylinTable(kylinConfig, sinkDesc, kylinConfig.getKylinMetricsSubjectQueryCube(), columns); } - public static TableDesc generateKylinTableForMetricsQueryRPC(KylinConfig kylinConfig, SinkTool sinkTool) { + public static TableDesc generateKylinTableForMetricsQueryRPC(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) { List<Pair<String, String>> columns = Lists.newLinkedList(); columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryRPC()); columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable()); - return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectQueryRpcCall(), columns); + return generateKylinTable(kylinConfig, sinkDesc, kylinConfig.getKylinMetricsSubjectQueryRpcCall(), columns); } - public static TableDesc generateKylinTableForMetricsJob(KylinConfig kylinConfig, SinkTool sinkTool) { + public static TableDesc generateKylinTableForMetricsJob(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) { List<Pair<String, String>> columns = Lists.newLinkedList(); columns.addAll(HiveTableCreator.getHiveColumnsForMetricsJob()); columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable()); - return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectJob(), columns); + return generateKylinTable(kylinConfig, sinkDesc, kylinConfig.getKylinMetricsSubjectJob(), columns); } - public static TableDesc generateKylinTableForMetricsJobException(KylinConfig kylinConfig, SinkTool sinkTool) { + public static TableDesc generateKylinTableForMetricsJobException(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) { List<Pair<String, String>> columns = Lists.newLinkedList(); columns.addAll(HiveTableCreator.getHiveColumnsForMetricsJobException()); columns.addAll(HiveTableCreator.getPartitionKVsForHiveTable()); - return generateKylinTable(kylinConfig, sinkTool, kylinConfig.getKylinMetricsSubjectJobException(), columns); + return generateKylinTable(kylinConfig, sinkDesc, kylinConfig.getKylinMetricsSubjectJobException(), columns); } - public static TableDesc generateKylinTable(KylinConfig kylinConfig, SinkTool sinkTool, String subject, - List<Pair<String, String>> columns) { + public static TableDesc generateKylinTable(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc, String subject, + List<Pair<String, String>> columns) { TableDesc kylinTable = new TableDesc(); Pair<String, String> tableNameSplits = ActiveReservoirReporter - .getTableNameSplits(sinkTool.getTableNameForMetrics(subject)); + .getTableNameSplits(sinkDesc.getTableNameForMetrics(subject)); kylinTable.setUuid(RandomUtil.randomUUID().toString()); kylinTable.setDatabase(tableNameSplits.getFirst()); kylinTable.setName(tableNameSplits.getSecond()); kylinTable.setTableType(null); kylinTable.setLastModified(0L); - kylinTable.setSourceType(sinkTool.getSourceType()); + kylinTable.setSourceType(sinkDesc.getSourceType()); ColumnDesc[] columnDescs = new ColumnDesc[columns.size()]; for (int i = 0; i < columns.size(); i++) { diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/ModelCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/ModelCreator.java index cc94411..6f4a18f 100644 --- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/ModelCreator.java +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/ModelCreator.java @@ -29,14 +29,13 @@ import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.JoinTableDesc; import org.apache.kylin.metadata.model.ModelDimensionDesc; import org.apache.kylin.metadata.model.PartitionDesc; -import org.apache.kylin.metrics.lib.SinkTool; import org.apache.kylin.metrics.lib.impl.RecordEvent; import org.apache.kylin.metrics.lib.impl.TimePropertyEnum; import org.apache.kylin.metrics.property.JobPropertyEnum; import org.apache.kylin.metrics.property.QueryCubePropertyEnum; import org.apache.kylin.metrics.property.QueryPropertyEnum; import org.apache.kylin.metrics.property.QueryRPCPropertyEnum; -import org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool; +import org.apache.kylin.tool.metrics.systemcube.def.MetricsSinkDesc; import com.google.common.collect.Lists; @@ -45,10 +44,9 @@ public class ModelCreator { public static final Serializer<DataModelDesc> MODELDESC_SERIALIZER = new JsonSerializer<>(DataModelDesc.class); public static void main(String[] args) throws Exception { - // KylinConfig.setSandboxEnvIfPossible(); KylinConfig config = KylinConfig.getInstanceFromEnv(); - DataModelDesc kylinModel = generateKylinModelForMetricsQuery("ADMIN", config, new HiveSinkTool()); + DataModelDesc kylinModel = generateKylinModelForMetricsQuery("ADMIN", config, new MetricsSinkDesc()); ByteArrayOutputStream buf = new ByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(buf); MODELDESC_SERIALIZER.serialize(kylinModel, dout); @@ -66,36 +64,36 @@ public class ModelCreator { } public static DataModelDesc generateKylinModelForMetricsQuery(String owner, KylinConfig kylinConfig, - SinkTool sinkTool) { - String tableName = sinkTool.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQuery()); + MetricsSinkDesc sinkDesc) { + String tableName = sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQuery()); return generateKylinModel(owner, tableName, getDimensionsForMetricsQuery(), getMeasuresForMetricsQuery(), getPartitionDesc(tableName)); } public static DataModelDesc generateKylinModelForMetricsQueryCube(String owner, KylinConfig kylinConfig, - SinkTool sinkTool) { - String tableName = sinkTool.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQueryCube()); + MetricsSinkDesc sinkDesc) { + String tableName = sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQueryCube()); return generateKylinModel(owner, tableName, getDimensionsForMetricsQueryCube(), getMeasuresForMetricsQueryCube(), getPartitionDesc(tableName)); } public static DataModelDesc generateKylinModelForMetricsQueryRPC(String owner, KylinConfig kylinConfig, - SinkTool sinkTool) { - String tableName = sinkTool.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQueryRpcCall()); + MetricsSinkDesc sinkDesc) { + String tableName = sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectQueryRpcCall()); return generateKylinModel(owner, tableName, getDimensionsForMetricsQueryRPC(), getMeasuresForMetricsQueryRPC(), getPartitionDesc(tableName)); } public static DataModelDesc generateKylinModelForMetricsJob(String owner, KylinConfig kylinConfig, - SinkTool sinkTool) { - String tableName = sinkTool.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectJob()); + MetricsSinkDesc sinkDesc) { + String tableName = sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectJob()); return generateKylinModel(owner, tableName, getDimensionsForMetricsJob(), getMeasuresForMetricsJob(), getPartitionDesc(tableName)); } public static DataModelDesc generateKylinModelForMetricsJobException(String owner, KylinConfig kylinConfig, - SinkTool sinkTool) { - String tableName = sinkTool.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectJobException()); + MetricsSinkDesc sinkDesc) { + String tableName = sinkDesc.getTableNameForMetrics(kylinConfig.getKylinMetricsSubjectJobException()); return generateKylinModel(owner, tableName, getDimensionsForMetricsJobException(), getMeasuresForMetricsJobException(), getPartitionDesc(tableName)); } diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java index 8a6c98c..a2809c0 100644 --- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/SCCreator.java @@ -14,28 +14,26 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.tool.metrics.systemcube; -import java.io.BufferedInputStream; import java.io.BufferedWriter; import java.io.ByteArrayOutputStream; import java.io.DataOutputStream; import java.io.File; -import java.io.FileInputStream; +import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; import java.io.OutputStreamWriter; import java.nio.charset.StandardCharsets; -import java.util.Collection; -import java.util.HashSet; import java.util.List; -import java.util.Set; +import com.fasterxml.jackson.core.type.TypeReference; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.cli.Options; +import org.apache.commons.io.FileUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.common.persistence.Serializer; @@ -50,14 +48,19 @@ import org.apache.kylin.metadata.TableMetadataManager; import org.apache.kylin.metadata.model.DataModelDesc; import org.apache.kylin.metadata.model.TableDesc; import org.apache.kylin.metadata.project.ProjectInstance; -import org.apache.kylin.metrics.lib.SinkTool; -import org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool; +import org.apache.kylin.stream.core.source.StreamingSourceConfig; +import org.apache.kylin.tool.metrics.systemcube.def.MetricsSinkDesc; +import org.apache.kylin.tool.metrics.systemcube.streamingv2.KafkaTopicCreator; +import org.apache.kylin.tool.metrics.systemcube.streamingv2.StreamingMetadataCreator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Strings; import com.google.common.collect.Lists; +/** + * System Cube Metadata Creator CLI + */ public class SCCreator extends AbstractApplication { private static final Logger logger = LoggerFactory.getLogger(SCCreator.class); @@ -74,8 +77,10 @@ public class SCCreator extends AbstractApplication { private static final String D_PROJECT = "project/"; private static final String D_TABLE = "table/"; private static final String D_MODEL_DESC = "model_desc/"; + private static final String D_STREAMING_V2 = "streaming_v2/"; private static final String F_HIVE_SQL = "create_hive_tables_for_system_cubes"; + private static final String F_KAFKA_TOPIC = "create_kafka_topic_for_system_cubes"; protected final Options options; @@ -104,7 +109,7 @@ public class SCCreator extends AbstractApplication { String output = optionsHelper.getOptionValue(OPTION_OUTPUT); String inputConfig = optionsHelper.getOptionValue(OPTION_INPUT_CONFIG); if (Strings.isNullOrEmpty(inputConfig)) { - throw new RuntimeException("Input configuration file should be specified!!!"); + throw new FileNotFoundException("Input configuration file should be specified!!!"); } execute(owner, output, inputConfig); @@ -118,37 +123,51 @@ public class SCCreator extends AbstractApplication { output += "/"; } - Set<SinkTool> sourceToolSet = JsonUtil.readValueWithTyping( - new BufferedInputStream(new FileInputStream(new File(inputConfig))), HashSet.class); - run(owner, output, sourceToolSet); + TypeReference<List<MetricsSinkDesc>> typeRef = new TypeReference<List<MetricsSinkDesc>>() { + }; + List<MetricsSinkDesc> metricsSinkDescList = JsonUtil.readValue(FileUtils.readFileToString(new File(inputConfig)), typeRef); + run(owner, output, metricsSinkDescList); } - private void run(String owner, String output, Collection<SinkTool> sinkToolSet) throws IOException { + private void run(String owner, String output, List<MetricsSinkDesc> sinkToolSet) throws IOException { List<TableDesc> kylinTables = Lists.newArrayList(); List<DataModelDesc> kylinModels = Lists.newArrayList(); - List<CubeDesc> kylinCubeDescs = Lists.newArrayList(); + List<CubeDesc> cubeDescList = Lists.newArrayList(); List<CubeInstance> kylinCubeInstances = Lists.newArrayList(); + List<StreamingSourceConfig> streamingSourceConfigs = Lists.newArrayList(); - boolean ifHive = false; - for (SinkTool sourceTool : sinkToolSet) { - if (sourceTool instanceof HiveSinkTool) { - ifHive = true; - } else { - logger.warn("current version only support hive sink!!!"); - continue; + + boolean hasHive = false; + boolean hasKafka = false; + for (MetricsSinkDesc sinkDesc : sinkToolSet) { + if (sinkDesc.useHive()) { + hasHive = true; + } else if (sinkDesc.useKafka()) { + hasKafka = true; + } + kylinTables.addAll(generateKylinTableForSystemCube(sinkDesc)); + kylinModels.addAll(generateKylinModelForSystemCube(owner, sinkDesc)); + cubeDescList.addAll(generateKylinCubeDescForSystemCube(sinkDesc)); + kylinCubeInstances.addAll(generateKylinCubeInstanceForSystemCube(owner, sinkDesc)); + + if (sinkDesc.useKafka()) { + streamingSourceConfigs.add(StreamingMetadataCreator.generateKylinTableForMetricsJob(config, sinkDesc)); + streamingSourceConfigs.add(StreamingMetadataCreator.generateKylinTableForMetricsJobException(config, sinkDesc)); + streamingSourceConfigs.add(StreamingMetadataCreator.generateKylinTableForMetricsQuery(config, sinkDesc)); + streamingSourceConfigs.add(StreamingMetadataCreator.generateKylinTableForMetricsQueryRpcCall(config, sinkDesc)); + streamingSourceConfigs.add(StreamingMetadataCreator.generateKylinTableForMetricsQueryCube(config, sinkDesc)); } - kylinTables.addAll(generateKylinTableForSystemCube(sourceTool)); - kylinModels.addAll(generateKylinModelForSystemCube(owner, sourceTool)); - kylinCubeDescs.addAll(generateKylinCubeDescForSystemCube(sourceTool)); - kylinCubeInstances.addAll(generateKylinCubeInstanceForSystemCube(owner, sourceTool)); } - if (ifHive) { + if (hasHive) { generateHiveTableSQLFileForSystemCube(output); } + if (hasKafka) { + generateKafkaTopicFileForSystemCube(output); + } ProjectInstance projectInstance = ProjectCreator.generateKylinProjectInstance(owner, kylinTables, kylinModels, - kylinCubeDescs); + cubeDescList); generateKylinProjectFileForSystemCube(output, projectInstance); for (TableDesc tableDesc : kylinTables) { generateKylinTableFileForSystemCube(output, tableDesc); @@ -156,54 +175,58 @@ public class SCCreator extends AbstractApplication { for (DataModelDesc dataModelDesc : kylinModels) { generateKylinModelFileForSystemCube(output, dataModelDesc); } - for (CubeDesc cubeDesc : kylinCubeDescs) { + for (CubeDesc cubeDesc : cubeDescList) { generateKylinCubeDescFileForSystemCube(output, cubeDesc); } for (CubeInstance cubeInstance : kylinCubeInstances) { generateKylinCubeInstanceFileForSystemCube(output, cubeInstance); } + + for (StreamingSourceConfig sourceConfig : streamingSourceConfigs) { + generateKylinStreamingConfigForSystemCube(output, sourceConfig); + } } - private List<TableDesc> generateKylinTableForSystemCube(SinkTool sinkTool) { + private List<TableDesc> generateKylinTableForSystemCube(MetricsSinkDesc sinkDesc) { List<TableDesc> result = Lists.newLinkedList(); - result.add(KylinTableCreator.generateKylinTableForMetricsQuery(config, sinkTool)); - result.add(KylinTableCreator.generateKylinTableForMetricsQueryCube(config, sinkTool)); - result.add(KylinTableCreator.generateKylinTableForMetricsQueryRPC(config, sinkTool)); - result.add(KylinTableCreator.generateKylinTableForMetricsJob(config, sinkTool)); - result.add(KylinTableCreator.generateKylinTableForMetricsJobException(config, sinkTool)); + result.add(KylinTableCreator.generateKylinTableForMetricsQuery(config, sinkDesc)); + result.add(KylinTableCreator.generateKylinTableForMetricsQueryCube(config, sinkDesc)); + result.add(KylinTableCreator.generateKylinTableForMetricsQueryRPC(config, sinkDesc)); + result.add(KylinTableCreator.generateKylinTableForMetricsJob(config, sinkDesc)); + result.add(KylinTableCreator.generateKylinTableForMetricsJobException(config, sinkDesc)); return result; } - private List<DataModelDesc> generateKylinModelForSystemCube(String owner, SinkTool sinkTool) { + private List<DataModelDesc> generateKylinModelForSystemCube(String owner, MetricsSinkDesc sinkDesc) { List<DataModelDesc> result = Lists.newLinkedList(); - result.add(ModelCreator.generateKylinModelForMetricsQuery(owner, config, sinkTool)); - result.add(ModelCreator.generateKylinModelForMetricsQueryCube(owner, config, sinkTool)); - result.add(ModelCreator.generateKylinModelForMetricsQueryRPC(owner, config, sinkTool)); - result.add(ModelCreator.generateKylinModelForMetricsJob(owner, config, sinkTool)); - result.add(ModelCreator.generateKylinModelForMetricsJobException(owner, config, sinkTool)); + result.add(ModelCreator.generateKylinModelForMetricsQuery(owner, config, sinkDesc)); + result.add(ModelCreator.generateKylinModelForMetricsQueryCube(owner, config, sinkDesc)); + result.add(ModelCreator.generateKylinModelForMetricsQueryRPC(owner, config, sinkDesc)); + result.add(ModelCreator.generateKylinModelForMetricsJob(owner, config, sinkDesc)); + result.add(ModelCreator.generateKylinModelForMetricsJobException(owner, config, sinkDesc)); return result; } - private List<CubeDesc> generateKylinCubeDescForSystemCube(SinkTool sinkTool) { + private List<CubeDesc> generateKylinCubeDescForSystemCube(MetricsSinkDesc sinkDesc) { List<CubeDesc> result = Lists.newLinkedList(); - result.add(CubeDescCreator.generateKylinCubeDescForMetricsQuery(config, sinkTool)); - result.add(CubeDescCreator.generateKylinCubeDescForMetricsQueryCube(config, sinkTool)); - result.add(CubeDescCreator.generateKylinCubeDescForMetricsQueryRPC(config, sinkTool)); - result.add(CubeDescCreator.generateKylinCubeDescForMetricsJob(config, sinkTool)); - result.add(CubeDescCreator.generateKylinCubeDescForMetricsJobException(config, sinkTool)); + result.add(CubeDescCreator.generateKylinCubeDescForMetricsQuery(config, sinkDesc)); + result.add(CubeDescCreator.generateKylinCubeDescForMetricsQueryCube(config, sinkDesc)); + result.add(CubeDescCreator.generateKylinCubeDescForMetricsQueryRPC(config, sinkDesc)); + result.add(CubeDescCreator.generateKylinCubeDescForMetricsJob(config, sinkDesc)); + result.add(CubeDescCreator.generateKylinCubeDescForMetricsJobException(config, sinkDesc)); return result; } - private List<CubeInstance> generateKylinCubeInstanceForSystemCube(String owner, SinkTool sinkTool) { + private List<CubeInstance> generateKylinCubeInstanceForSystemCube(String owner, MetricsSinkDesc sinkDesc) { List<CubeInstance> result = Lists.newLinkedList(); - result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQuery(owner, config, sinkTool)); - result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQueryCube(owner, config, sinkTool)); - result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQueryRPC(owner, config, sinkTool)); - result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsJob(owner, config, sinkTool)); - result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsJobException(owner, config, sinkTool)); + result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQuery(owner, config, sinkDesc)); + result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQueryCube(owner, config, sinkDesc)); + result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsQueryRPC(owner, config, sinkDesc)); + result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsJob(owner, config, sinkDesc)); + result.add(CubeInstanceCreator.generateKylinCubeInstanceForMetricsJobException(owner, config, sinkDesc)); return result; } @@ -213,6 +236,11 @@ public class SCCreator extends AbstractApplication { saveToFile(output + F_HIVE_SQL + ".sql", contents); } + private void generateKafkaTopicFileForSystemCube(String output) throws IOException { + String contents = KafkaTopicCreator.generateCreateCommand(config); + saveToFile(output + F_KAFKA_TOPIC + ".sh", contents); + } + private void generateKylinTableFileForSystemCube(String output, TableDesc kylinTable) throws IOException { saveSystemCubeMetadataToFile(output + D_TABLE + kylinTable.getIdentity() + ".json", kylinTable, TableMetadataManager.TABLE_SERIALIZER); @@ -223,6 +251,11 @@ public class SCCreator extends AbstractApplication { ModelCreator.MODELDESC_SERIALIZER); } + private void generateKylinStreamingConfigForSystemCube(String output, StreamingSourceConfig streamingConfig) throws IOException { + saveSystemCubeMetadataToFile(output + D_STREAMING_V2 + streamingConfig.getName() + ".json", streamingConfig, + StreamingMetadataCreator.STREAMING_SOURCE_CONFIG_SERIALIZER); + } + private void generateKylinCubeInstanceFileForSystemCube(String output, CubeInstance cubeInstance) throws IOException { saveSystemCubeMetadataToFile(output + D_CUBE_INSTANCE + cubeInstance.getName() + ".json", cubeInstance, @@ -241,7 +274,7 @@ public class SCCreator extends AbstractApplication { } private <T extends RootPersistentEntity> void saveSystemCubeMetadataToFile(String fileName, T metadata, - Serializer serializer) throws IOException { + Serializer serializer) throws IOException { ByteArrayOutputStream buf = new ByteArrayOutputStream(); DataOutputStream dout = new DataOutputStream(buf); serializer.serialize(metadata, dout); diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/util/HiveSinkTool.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/def/MetricsSinkDesc.java similarity index 62% rename from tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/util/HiveSinkTool.java rename to tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/def/MetricsSinkDesc.java index 5907bf2..8dcf01e 100644 --- a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/util/HiveSinkTool.java +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/def/MetricsSinkDesc.java @@ -14,24 +14,27 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ -package org.apache.kylin.tool.metrics.systemcube.util; +package org.apache.kylin.tool.metrics.systemcube.def; import java.util.Map; import org.apache.kylin.metadata.model.ISourceAware; import org.apache.kylin.metadata.model.IStorageAware; -import org.apache.kylin.metrics.lib.SinkTool; import org.apache.kylin.metrics.lib.impl.hive.HiveReservoirReporter; import com.fasterxml.jackson.annotation.JsonAutoDetect; import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.collect.Maps; +import org.apache.kylin.metrics.lib.impl.kafka.KafkaReservoirReporter; @SuppressWarnings("serial") @JsonAutoDetect(fieldVisibility = JsonAutoDetect.Visibility.NONE, getterVisibility = JsonAutoDetect.Visibility.NONE, isGetterVisibility = JsonAutoDetect.Visibility.NONE, setterVisibility = JsonAutoDetect.Visibility.NONE) -public class HiveSinkTool implements SinkTool { +public class MetricsSinkDesc { + + @JsonProperty("sink") + private String sinkType; @JsonProperty("storage_type") protected int storageType = IStorageAware.ID_SHARDED_HBASE; @@ -39,16 +42,31 @@ public class HiveSinkTool implements SinkTool { @JsonProperty("cube_desc_override_properties") protected Map<String, String> cubeDescOverrideProperties = Maps.newHashMap(); + @JsonProperty("table_properties") + protected Map<String, String> tableProperties = Maps.newHashMap(); + public int getStorageType() { return storageType; } public int getSourceType() { - return ISourceAware.ID_HIVE; + if (sinkType.equalsIgnoreCase("hive")) { + return ISourceAware.ID_HIVE; + } else if (sinkType.equalsIgnoreCase("kafka")) { + return ISourceAware.ID_KAFKA; + } else { + return ISourceAware.ID_HIVE; + } } public String getTableNameForMetrics(String subject) { - return HiveReservoirReporter.getTableFromSubject(subject); + if (sinkType.equalsIgnoreCase("hive")) { + return HiveReservoirReporter.getTableFromSubject(subject); + } else if (sinkType.equalsIgnoreCase("kafka")) { + return KafkaReservoirReporter.getTableFromSubject(subject); + } else { + return HiveReservoirReporter.getTableFromSubject(subject); + } } public Map<String, String> getCubeDescOverrideProperties() { @@ -58,4 +76,20 @@ public class HiveSinkTool implements SinkTool { public void setCubeDescOverrideProperties(Map<String, String> cubeDescOverrideProperties) { this.cubeDescOverrideProperties = cubeDescOverrideProperties; } + + public String getSinkType() { + return sinkType; + } + + public boolean useKafka(){ + return sinkType.equalsIgnoreCase("kafka"); + } + public boolean useHive(){ + return sinkType.equalsIgnoreCase("hive"); + } + + + public Map<String, String> getTableProperties() { + return tableProperties; + } } diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/KafkaTopicCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/KafkaTopicCreator.java new file mode 100644 index 0000000..c495919 --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/KafkaTopicCreator.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.tool.metrics.systemcube.streamingv2; + +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.metrics.lib.impl.kafka.KafkaReservoirReporter; + +public class KafkaTopicCreator { + + public static String generateCreateCommand(KylinConfig config) { + StringBuilder sb = new StringBuilder(); + String[] topics = new String[]{ + config.getKylinMetricsSubjectQuery(), + config.getKylinMetricsSubjectQueryCube(), + config.getKylinMetricsSubjectQueryRpcCall(), + config.getKylinMetricsSubjectJob(), + config.getKylinMetricsSubjectJobException()}; + for (String topic : topics) { + String finalTopic = KafkaReservoirReporter.decorateTopic(topic); + sb.append("sh kafka-topics"); + sb.append(" --create"); + sb.append(" --topic "); + sb.append(finalTopic); + sb.append(" --zookeeper localhost:2181"); + sb.append(" --partitions 3"); + sb.append(" --replication-factor 1"); + sb.append(" \n"); + } + return sb.toString(); + } +} diff --git a/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/StreamingMetadataCreator.java b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/StreamingMetadataCreator.java new file mode 100644 index 0000000..93121ce --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/metrics/systemcube/streamingv2/StreamingMetadataCreator.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kylin.tool.metrics.systemcube.streamingv2; + +import com.google.common.collect.Lists; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.persistence.JsonSerializer; +import org.apache.kylin.common.persistence.Serializer; +import org.apache.kylin.common.util.Pair; +import org.apache.kylin.metrics.lib.impl.kafka.KafkaReservoirReporter; +import org.apache.kylin.stream.core.source.MessageParserInfo; +import org.apache.kylin.stream.core.source.StreamingSourceConfig; +import org.apache.kylin.tool.metrics.systemcube.HiveTableCreator; +import org.apache.kylin.tool.metrics.systemcube.def.MetricsSinkDesc; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class StreamingMetadataCreator { + + public static final Serializer<StreamingSourceConfig> STREAMING_SOURCE_CONFIG_SERIALIZER = new JsonSerializer<>(StreamingSourceConfig.class); + + + public static StreamingSourceConfig generateKylinTableForMetricsQuery(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) { + List<Pair<String, String>> columns = Lists.newLinkedList(); + columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQuery()); + return generateStreamingV2Config(kylinConfig, sinkDesc, kylinConfig.getKylinMetricsSubjectQuery(), columns); + } + + public static StreamingSourceConfig generateKylinTableForMetricsQueryCube(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) { + List<Pair<String, String>> columns = Lists.newLinkedList(); + columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryCube()); + return generateStreamingV2Config(kylinConfig, sinkDesc, kylinConfig.getKylinMetricsSubjectQueryCube(), columns); + } + + public static StreamingSourceConfig generateKylinTableForMetricsQueryRpcCall(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) { + List<Pair<String, String>> columns = Lists.newLinkedList(); + columns.addAll(HiveTableCreator.getHiveColumnsForMetricsQueryRPC()); + return generateStreamingV2Config(kylinConfig, sinkDesc, kylinConfig.getKylinMetricsSubjectQueryRpcCall(), columns); + } + + public static StreamingSourceConfig generateKylinTableForMetricsJob(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) { + List<Pair<String, String>> columns = Lists.newLinkedList(); + columns.addAll(HiveTableCreator.getHiveColumnsForMetricsJob()); + return generateStreamingV2Config(kylinConfig, sinkDesc, kylinConfig.getKylinMetricsSubjectJob(), columns); + } + + public static StreamingSourceConfig generateKylinTableForMetricsJobException(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc) { + List<Pair<String, String>> columns = Lists.newLinkedList(); + columns.addAll(HiveTableCreator.getHiveColumnsForMetricsJobException()); + return generateStreamingV2Config(kylinConfig, sinkDesc, kylinConfig.getKylinMetricsSubjectJobException(), columns); + } + + private static StreamingSourceConfig generateStreamingV2Config(KylinConfig kylinConfig, MetricsSinkDesc sinkDesc, String subject, List<Pair<String, String>> columns) { + StreamingSourceConfig streamingSourceConfig = new StreamingSourceConfig(); + MessageParserInfo parserInfo = new MessageParserInfo(); + parserInfo.setFormatTs(false); + parserInfo.setTsColName("KTIMESTAMP"); + parserInfo.setTsPattern("MS"); + parserInfo.setTsParser("org.apache.kylin.stream.source.kafka.LongTimeParser"); + Map<String, String> columnToSourceFieldMapping = new HashMap<>(); + for (Pair<String, String> col : columns) { + columnToSourceFieldMapping.put(col.getKey(), col.getKey()); + } + parserInfo.setColumnToSourceFieldMapping(columnToSourceFieldMapping); + + Map<String, String> properties = new HashMap<>(); + String topic = KafkaReservoirReporter.decorateTopic(subject); + String table = KafkaReservoirReporter.sink.getTableFromSubject(subject); + + properties.put("topic", topic); + properties.put("bootstrap.servers", sinkDesc.getTableProperties().get("bootstrap.servers")); + + streamingSourceConfig.setName(table); + streamingSourceConfig.setProperties(properties); + streamingSourceConfig.setParserInfo(parserInfo); + + streamingSourceConfig.updateRandomUuid(); + streamingSourceConfig.setLastModified(System.currentTimeMillis()); + + return streamingSourceConfig; + } + +} diff --git a/tool/src/main/resources/SCSinkTools.json b/tool/src/main/resources/SCSinkTools.json index 2872117..101ce0c 100644 --- a/tool/src/main/resources/SCSinkTools.json +++ b/tool/src/main/resources/SCSinkTools.json @@ -1,15 +1,24 @@ [ - [ - "org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool", - { - "storage_type": 2, - "cube_desc_override_properties": [ - "java.util.HashMap", - { - "kylin.cube.algorithm": "INMEM", - "kylin.cube.max-building-segments": "1" - } - ] + { + "sink": "hive", + "storage_type": 2, + "cube_desc_override_properties": { + "kylin.cube.algorithm": "INMEM", + "kylin.cube.max-building-segments": "1" } - ] + }, + { + "sink": "kafka", + "storage_type": 3, + "cube_desc_override_properties": { + "kylin.cube.algorithm": "INMEM", + "kylin.stream.cube.window": 28800, + "kylin.stream.cube.duration": 3600, + "kylin.stream.segment.retention.policy": "purge", + "kylin.cube.max-building-segments": "20" + }, + "table_properties": { + "bootstrap.servers": "localhost:9092" + } + } ] \ No newline at end of file diff --git a/tool/src/test/java/org/apache/kylin/tool/metrics/systemcube/SCCreatorTest.java b/tool/src/test/java/org/apache/kylin/tool/metrics/systemcube/SCCreatorTest.java index ee553ab..235b4f1 100644 --- a/tool/src/test/java/org/apache/kylin/tool/metrics/systemcube/SCCreatorTest.java +++ b/tool/src/test/java/org/apache/kylin/tool/metrics/systemcube/SCCreatorTest.java @@ -14,39 +14,33 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.tool.metrics.systemcube; import static org.junit.Assert.assertEquals; -import java.io.BufferedInputStream; -import java.io.BufferedOutputStream; import java.io.File; -import java.io.FileInputStream; import java.io.FileOutputStream; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; +import com.fasterxml.jackson.core.type.TypeReference; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.io.FileUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.JsonUtil; import org.apache.kylin.common.util.LocalFileMetadataTestCase; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.metadata.realization.RealizationStatusEnum; -import org.apache.kylin.metrics.lib.SinkTool; -import org.apache.kylin.tool.metrics.systemcube.util.HiveSinkTool; +import org.apache.kylin.tool.metrics.systemcube.def.MetricsSinkDesc; import org.junit.After; import org.junit.Assert; import org.junit.Before; import org.junit.Test; -import com.fasterxml.jackson.databind.ObjectMapper; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - public class SCCreatorTest extends LocalFileMetadataTestCase { private File tempMetadataDir; @@ -84,7 +78,7 @@ public class SCCreatorTest extends LocalFileMetadataTestCase { CubeManager cubeManager = CubeManager.getInstance(local); List<CubeInstance> cubeList = cubeManager.listAllCubes(); System.out.println("System cubes: " + cubeList); - assertEquals(cubeList.size(), 5); + assertEquals(cubeList.size(), 10); for (CubeInstance cube : cubeList) { Assert.assertTrue(cube.getStatus() != RealizationStatusEnum.DESCBROKEN); @@ -96,18 +90,15 @@ public class SCCreatorTest extends LocalFileMetadataTestCase { Map<String, String> cubeDescOverrideProperties = Maps.newHashMap(); cubeDescOverrideProperties.put("kylin.cube.algorithm", "INMEM"); - HiveSinkTool hiveSinkTool = new HiveSinkTool(); - hiveSinkTool.setCubeDescOverrideProperties(cubeDescOverrideProperties); + MetricsSinkDesc metricsSinkDesc = new MetricsSinkDesc(); + metricsSinkDesc.setCubeDescOverrideProperties(cubeDescOverrideProperties); + List<MetricsSinkDesc> metricsSinkDescList = Lists.newArrayList(); String outputPath = "src/test/resources/SCSinkTools.json"; - try (BufferedOutputStream os = new BufferedOutputStream(new FileOutputStream(outputPath))) { - ObjectMapper mapper = new ObjectMapper(); - mapper.enableDefaultTyping(); - mapper.writeValue(os, Sets.newHashSet(hiveSinkTool)); - } + JsonUtil.writeValue(new FileOutputStream(outputPath), metricsSinkDescList); - Set<SinkTool> sinkToolSet = readSinkToolsJson(outputPath); - for (SinkTool entry : sinkToolSet) { + List<MetricsSinkDesc> sinkToolSet = readSinkToolsJson(outputPath); + for (MetricsSinkDesc entry : sinkToolSet) { Map<String, String> props = entry.getCubeDescOverrideProperties(); for (String key : cubeDescOverrideProperties.keySet()) { assertEquals(props.get(key), cubeDescOverrideProperties.get(key)); @@ -117,18 +108,17 @@ public class SCCreatorTest extends LocalFileMetadataTestCase { @Test public void testReadSinkToolsJson() throws Exception { - Set<SinkTool> sinkToolSet = readSinkToolsJson("src/main/resources/SCSinkTools.json"); - for (SinkTool entry : sinkToolSet) { + List<MetricsSinkDesc> sinkToolSet = readSinkToolsJson("src/main/resources/SCSinkTools.json"); + for (MetricsSinkDesc entry : sinkToolSet) { Map<String, String> props = entry.getCubeDescOverrideProperties(); assertEquals(props.get("kylin.cube.algorithm"), "INMEM"); } } - private Set<SinkTool> readSinkToolsJson(String jsonPath) throws Exception { - try (BufferedInputStream is = new BufferedInputStream(new FileInputStream(jsonPath))) { - ObjectMapper mapper = new ObjectMapper(); - mapper.enableDefaultTyping(); - return mapper.readValue(is, HashSet.class); - } + private List<MetricsSinkDesc> readSinkToolsJson(String jsonPath) throws Exception { + TypeReference<List<MetricsSinkDesc>> typeRef = new TypeReference<List<MetricsSinkDesc>>() { + }; + List<MetricsSinkDesc> sourceToolSet = JsonUtil.readValue(FileUtils.readFileToString(new File(jsonPath)), typeRef); + return sourceToolSet; } } \ No newline at end of file