This is an automated email from the ASF dual-hosted git repository. nic pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push: new 4d65418 KYLIN-3905 Disable shrunken dict if Mr-Hive dict is enabled & code format 4d65418 is described below commit 4d654188e2e3a09fec14898f3c49257eb7c07885 Author: XiaoxiangYu <hit_la...@126.com> AuthorDate: Mon Jul 22 11:13:34 2019 +0800 KYLIN-3905 Disable shrunken dict if Mr-Hive dict is enabled & code format 1. disable shrunken dict if Mr-Hive dict is enabled 2. reformat core-metrics module --- .../org/apache/kylin/common/KylinConfigBase.java | 12 +++++- .../apache/kylin/common/livy/LivyRestClient.java | 2 +- .../java/org/apache/kylin/cube/model/CubeDesc.java | 11 +++++- .../apache/kylin/job/metrics/JobMetricsFacade.java | 3 -- .../org/apache/kylin/metrics/MetricsManager.java | 10 ++++- .../apache/kylin/metrics/lib/ActiveReservoir.java | 19 ++++++++- .../kylin/metrics/lib/ActiveReservoirReporter.java | 5 ++- .../java/org/apache/kylin/metrics/lib/Record.java | 3 ++ .../java/org/apache/kylin/metrics/lib/Sink.java | 3 ++ .../metrics/lib/impl/AbstractActiveReservoir.java | 3 ++ .../metrics/lib/impl/BaseScheduledReporter.java | 4 ++ .../kylin/metrics/lib/impl/BlockingReservoir.java | 46 +++++++++++++++------- .../kylin/metrics/lib/impl/InstantReservoir.java | 3 ++ .../kylin/metrics/lib/impl/StubReservoir.java | 3 ++ .../kylin/metrics/property/JobPropertyEnum.java | 35 ++++++++++------ .../metrics/property/QueryCubePropertyEnum.java | 28 ++++++++++--- .../kylin/metrics/property/QueryPropertyEnum.java | 20 ++++++++-- .../metrics/property/QueryRPCPropertyEnum.java | 22 ++++++++--- .../kylin/engine/mr/BatchCubingJobBuilder2.java | 2 +- .../kylin/metrics/lib/impl/hive/HiveProducer.java | 27 +++++++------ .../metrics/lib/impl/hive/HiveProducerRecord.java | 3 ++ .../lib/impl/hive/HiveReservoirReporter.java | 6 +-- server/src/main/resources/kylinMetrics.xml | 9 ++++- .../apache/kylin/source/hive/HiveInputBase.java | 3 +- 24 files changed, 208 insertions(+), 74 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 6df0dd3..be37c55 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -556,12 +556,16 @@ public abstract class KylinConfigBase implements Serializable { // mr-hive dict // ============================================================================ + /** + * @return if mr-hive dict not enabled, return empty array; + * else return array contains "{TABLE_NAME}_{COLUMN_NAME}" + */ public String[] getMrHiveDictColumns() { String columnStr = getOptional("kylin.dictionary.mr-hive.columns", ""); - if (!StringUtils.isEmpty(columnStr)) { + if (!columnStr.equals("")) { return columnStr.split(","); } - return null; + return new String[0]; } public String getMrHiveDictDB() { @@ -1446,6 +1450,10 @@ public abstract class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.engine.livy-conf.livy-enabled", FALSE)); } + public String getLivyRestApiBacktick() { + return getOptional("kylin.engine.livy.backtick.quote", ""); + } + public String getLivyUrl() { return getOptional("kylin.engine.livy-conf.livy-url"); } diff --git a/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestClient.java b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestClient.java index e58dcb6..d2462d4 100644 --- a/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestClient.java +++ b/core-common/src/main/java/org/apache/kylin/common/livy/LivyRestClient.java @@ -70,7 +70,7 @@ public class LivyRestClient { // Because livy submit job use JDK's ProcessBuilder, here we need to quote backtick // otherwise backtick make livy throw org.apache.spark.sql.catalyst.parser.ParseException - String json = jobJson.replace("`", "\\\\`"); + String json = jobJson.replace("`", config.getLivyRestApiBacktick()); post.setEntity(new StringEntity(json, "UTF-8")); HttpResponse response = client.execute(post); diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 8b45837..f5bb427 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -1510,12 +1510,19 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { } public boolean isShrunkenDictFromGlobalEnabled() { - return config.isShrunkenDictFromGlobalEnabled() && !getAllGlobalDictColumns().isEmpty(); + boolean needShrunkenDict = config.isShrunkenDictFromGlobalEnabled() && !getAllGlobalDictColumns().isEmpty(); + boolean needMrHiveDict = config.getMrHiveDictColumns().length > 0; + if (needMrHiveDict && needShrunkenDict) { + logger.info("ShrunkenDict cannot work with MrHiveDict, so shutdown ShrunkenDict."); + return false; + } else { + return needShrunkenDict; + } } // UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns public List<TblColRef> getAllUHCColumns() { - List<TblColRef> uhcColumns = new ArrayList<TblColRef>(); + List<TblColRef> uhcColumns = new ArrayList<>(); uhcColumns.addAll(getAllGlobalDictColumns()); uhcColumns.addAll(getShardByColumns()); return uhcColumns; diff --git a/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java b/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java index 5a8caa8..e9d41bf 100644 --- a/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java +++ b/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java @@ -35,9 +35,6 @@ public class JobMetricsFacade { if (!KylinConfig.getInstanceFromEnv().isKylinMetricsReporterForJobEnabled()) { return; } - /** - * report job related metrics - */ RecordEvent metricsEvent; if (jobStats.throwable == null) { metricsEvent = new TimedRecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJob()); diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java b/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java index aff8cc9..a5f1088 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/MetricsManager.java @@ -43,6 +43,9 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +/** + * A metric system using a system cube to store/analyze metric information. + */ public class MetricsManager { public static final String SYSTEM_PROJECT = "KYLIN_SYSTEM"; @@ -62,6 +65,9 @@ public class MetricsManager { return instance; } + /** + * This method is called by Spring Framework at kylinMetrics.xml + */ public static void initMetricsManager(Sink systemCubeSink, Map<ActiveReservoir, List<Pair<String, Properties>>> sourceReporterBindProperties) { setSystemCubeSink(systemCubeSink); @@ -99,10 +105,10 @@ public class MetricsManager { if (ActiveReservoirReporter.class.isAssignableFrom(clz)) { values.add(new Pair(clz, entry.getSecond())); } else { - logger.warn("The class " + clz + " is not a sub class of " + ActiveReservoir.class); + logger.warn("The class {} is not a sub class of {}.", clz, ActiveReservoir.class); } } catch (ClassNotFoundException e) { - logger.warn("Cannot find class " + entry.getFirst()); + logger.warn("Cannot find class {}", entry.getFirst()); } } } diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java index 36ab759..542a781 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoir.java @@ -14,18 +14,32 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.metrics.lib; import java.io.Closeable; +/** + * Reservoir for mertics message, a Reservoir(something like cache)'s duty is store mertics message temporarily + * and emit messages to external Sink by notifying specific ActiveReservoirListener. + */ public interface ActiveReservoir extends Closeable { + /** + * @return how many mertics message was currently cached(not emit) + */ int size(); + /** + * stage metrics message into Reservoir, but whether to emit it to external storage + * immediately is decided by specific implemention + */ void update(Record record); + /** + * add listener which responsed to message update + */ void addListener(ActiveReservoirListener listener); void removeListener(ActiveReservoirListener listener); @@ -34,6 +48,9 @@ public interface ActiveReservoir extends Closeable { void setHAListener(ActiveReservoirListener listener); + /** + * do some prepare to accept metrics message + */ void start(); void stop(); diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java index 463aa92..fa807e8 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/ActiveReservoirReporter.java @@ -26,6 +26,9 @@ import org.apache.kylin.common.util.Pair; import com.google.common.base.Strings; +/** + * ActiveReservoirReporter report metrics message from ActiveReservoir + */ public abstract class ActiveReservoirReporter implements Closeable { public static final String KYLIN_PREFIX = KylinConfig.getInstanceFromEnv().getKylinMetricsPrefix(); @@ -39,7 +42,7 @@ public abstract class ActiveReservoirReporter implements Closeable { int i = 0; String database = splits.length == 1 ? KYLIN_PREFIX : splits[i++]; String tableNameOnly = splits[i]; - return new Pair(database, tableNameOnly); + return new Pair<>(database, tableNameOnly); } public static String getTableName(Pair<String, String> tableNameSplits) { diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java index a1bce1f..112951ad 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Record.java @@ -20,6 +20,9 @@ package org.apache.kylin.metrics.lib; import java.util.Map; +/** + * Metrics message + */ public interface Record { /** diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java index dff71bd..a04356d 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/Sink.java @@ -18,6 +18,9 @@ package org.apache.kylin.metrics.lib; +/** + * Sink is where mertics data will write to + */ public interface Sink { String getTableFromSubject(String subject); } diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/AbstractActiveReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/AbstractActiveReservoir.java index cc72710..3c116d5 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/AbstractActiveReservoir.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/AbstractActiveReservoir.java @@ -61,6 +61,9 @@ public abstract class AbstractActiveReservoir implements ActiveReservoir { isReady = false; } + /** + * closed by shutdown hook at MetricsSystem + */ public void close() { stop(); removeAllListener(); diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java index 531376a..c006352 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BaseScheduledReporter.java @@ -28,6 +28,10 @@ import org.slf4j.LoggerFactory; import com.google.common.util.concurrent.ThreadFactoryBuilder; +/** + * @deprecated abandon code which seems to be replaced by ActiveReservoirReporter, should be removed later + */ +@Deprecated public abstract class BaseScheduledReporter implements Closeable { private static final Logger logger = LoggerFactory.getLogger(BaseScheduledReporter.class); diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java index 2758adb..d754b19 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/BlockingReservoir.java @@ -30,16 +30,23 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ThreadFactoryBuilder; +/** + * A Reservoir which staged metrics message in memory, and emit them in fixed rate. + * This will help to reduce pressure to underlying resource. + */ public class BlockingReservoir extends AbstractActiveReservoir { private static final Logger logger = LoggerFactory.getLogger(BlockingReservoir.class); private static final int MAX_QUEUE_SIZE = 50000; + /** + * Cache for metrics message with max size is maxReportSize + */ private final BlockingQueue<Record> recordsQueue; private final Thread scheduledReporter; - private final int MIN_REPORT_SIZE; - private final int MAX_REPORT_SIZE; - private final long MAX_REPORT_TIME; + private final int minReportSize; + private final int maxReportSize; + private final long maxReportTime; private List<Record> records; public BlockingReservoir() { @@ -50,20 +57,22 @@ public class BlockingReservoir extends AbstractActiveReservoir { this(minReportSize, maxReportSize, 10); } - public BlockingReservoir(int minReportSize, int maxReportSize, int MAX_REPORT_TIME) { - this.MAX_REPORT_SIZE = maxReportSize; - this.MIN_REPORT_SIZE = minReportSize; - this.MAX_REPORT_TIME = MAX_REPORT_TIME * 60 * 1000L; + public BlockingReservoir(int minReportSize, int maxReportSize, int maxReportTime) { + this.minReportSize = minReportSize; + this.maxReportSize = maxReportSize; + this.maxReportTime = maxReportTime * 60 * 1000L; this.recordsQueue = new LinkedBlockingQueue<>(MAX_QUEUE_SIZE); this.listeners = Lists.newArrayList(); - this.records = Lists.newArrayListWithExpectedSize(MAX_REPORT_SIZE); - + this.records = Lists.newArrayListWithExpectedSize(this.maxReportSize); scheduledReporter = new ThreadFactoryBuilder().setNameFormat("metrics-blocking-reservoir-scheduler-%d").build() .newThread(new ReporterRunnable()); } + /** + * put record into queue but wait if queue is full + */ public void update(Record record) { if (!isReady) { logger.info("Current reservoir is not ready for update record"); @@ -72,7 +81,7 @@ public class BlockingReservoir extends AbstractActiveReservoir { try { recordsQueue.put(record); } catch (InterruptedException e) { - logger.warn("Thread is interrupted during putting value to blocking queue. \n" + e.toString()); + logger.warn("Thread is interrupted during putting value to blocking queue.", e); } catch (IllegalArgumentException e) { logger.warn("The record queue may be full"); } @@ -88,7 +97,7 @@ public class BlockingReservoir extends AbstractActiveReservoir { recordsQueue.drainTo(records); } else { records.clear(); - recordsQueue.drainTo(records, MAX_REPORT_SIZE); + recordsQueue.drainTo(records, maxReportSize); } boolean ifSucceed = true; @@ -118,11 +127,13 @@ public class BlockingReservoir extends AbstractActiveReservoir { return true; } + @Override public void start() { super.start(); scheduledReporter.start(); } + @Override public void stop() { super.stop(); scheduledReporter.interrupt(); @@ -134,8 +145,13 @@ public class BlockingReservoir extends AbstractActiveReservoir { } } + /** + * A thread which try to check if staged message queue has meet size threshold and wait duration threshold + * and notify listener every one minute + */ class ReporterRunnable implements Runnable { + @Override public void run() { long startTime = System.currentTimeMillis(); while (isReady) { @@ -144,10 +160,10 @@ public class BlockingReservoir extends AbstractActiveReservoir { sleep(); startTime = System.currentTimeMillis(); continue; - } else if (size() < MIN_REPORT_SIZE && (System.currentTimeMillis() - startTime < MAX_REPORT_TIME)) { - logger.info("The number of records in the blocking queue is less than " + MIN_REPORT_SIZE + // - " and the duration from last reporting is less than " + MAX_REPORT_TIME - + "ms. Will delay to report!"); + } else if (size() < minReportSize && (System.currentTimeMillis() - startTime < maxReportTime)) { + logger.info("The number of records in the blocking queue is less than {} and " + + "the duration from last reporting is less than {} ms. " + + "Will delay to report!", minReportSize, maxReportTime); sleep(); continue; } diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java index 7b6d710..6bd9539 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/InstantReservoir.java @@ -28,6 +28,9 @@ import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; +/** + * A Reservoir which don't staged metrics message at all, emit them in no time. + */ public class InstantReservoir extends AbstractActiveReservoir { private static final Logger logger = LoggerFactory.getLogger(InstantReservoir.class); diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoir.java b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoir.java index fe69dec..6a417e4 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoir.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/lib/impl/StubReservoir.java @@ -22,6 +22,9 @@ import org.apache.kylin.metrics.lib.ActiveReservoir; import org.apache.kylin.metrics.lib.ActiveReservoirListener; import org.apache.kylin.metrics.lib.Record; +/** + * A Reservoir will drop message. + */ public class StubReservoir implements ActiveReservoir { public void addListener(ActiveReservoirListener listener) { diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/property/JobPropertyEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/property/JobPropertyEnum.java index 3ca567e..e55cf2b 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/property/JobPropertyEnum.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/property/JobPropertyEnum.java @@ -18,19 +18,31 @@ package org.apache.kylin.metrics.property; -import java.util.Locale; - import com.google.common.base.Strings; +/** + * Definition of Metrics dimension and measure for Cube building job + */ public enum JobPropertyEnum { - ID_CODE("JOB_ID"), USER("KUSER"), PROJECT("PROJECT"), CUBE("CUBE_NAME"), TYPE("JOB_TYPE"), ALGORITHM( - "CUBING_TYPE"), STATUS("JOB_STATUS"), EXCEPTION("EXCEPTION"), // - SOURCE_SIZE("TABLE_SIZE"), CUBE_SIZE("CUBE_SIZE"), BUILD_DURATION("DURATION"), WAIT_RESOURCE_TIME( - "WAIT_RESOURCE_TIME"), PER_BYTES_TIME_COST("PER_BYTES_TIME_COST"), STEP_DURATION_DISTINCT_COLUMNS( - "STEP_DURATION_DISTINCT_COLUMNS"), STEP_DURATION_DICTIONARY( - "STEP_DURATION_DICTIONARY"), STEP_DURATION_INMEM_CUBING( - "STEP_DURATION_INMEM_CUBING"), STEP_DURATION_HFILE_CONVERT( - "STEP_DURATION_HFILE_CONVERT"); + + ID_CODE("JOB_ID"), + USER("KUSER"), + PROJECT("PROJECT"), + CUBE("CUBE_NAME"), + TYPE("JOB_TYPE"), + ALGORITHM("CUBING_TYPE"), + STATUS("JOB_STATUS"), + EXCEPTION("EXCEPTION"), + + SOURCE_SIZE("TABLE_SIZE"), + CUBE_SIZE("CUBE_SIZE"), + BUILD_DURATION("DURATION"), + WAIT_RESOURCE_TIME("WAIT_RESOURCE_TIME"), + PER_BYTES_TIME_COST("PER_BYTES_TIME_COST"), + STEP_DURATION_DISTINCT_COLUMNS("STEP_DURATION_DISTINCT_COLUMNS"), + STEP_DURATION_DICTIONARY("STEP_DURATION_DICTIONARY"), + STEP_DURATION_INMEM_CUBING("STEP_DURATION_INMEM_CUBING"), + STEP_DURATION_HFILE_CONVERT("STEP_DURATION_HFILE_CONVERT"); private final String propertyName; @@ -43,11 +55,10 @@ public enum JobPropertyEnum { return null; } for (JobPropertyEnum property : JobPropertyEnum.values()) { - if (property.propertyName.equals(name.toUpperCase(Locale.ROOT))) { + if (property.propertyName.equalsIgnoreCase(name)) { return property; } } - return null; } diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryCubePropertyEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryCubePropertyEnum.java index 21477dc..61e46e5 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryCubePropertyEnum.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryCubePropertyEnum.java @@ -22,13 +22,29 @@ import java.util.Locale; import com.google.common.base.Strings; +/** + * Definition of Metrics dimension and measure for Query Advanced + */ public enum QueryCubePropertyEnum { - PROJECT("PROJECT"), CUBE("CUBE_NAME"), SEGMENT("SEGMENT_NAME"), CUBOID_SOURCE("CUBOID_SOURCE"), CUBOID_TARGET( - "CUBOID_TARGET"), IF_MATCH("IF_MATCH"), FILTER_MASK("FILTER_MASK"), IF_SUCCESS("IF_SUCCESS"), // - TIME_SUM("STORAGE_CALL_TIME_SUM"), TIME_MAX("STORAGE_CALL_TIME_MAX"), WEIGHT_PER_HIT("WEIGHT_PER_HIT"), CALL_COUNT( - "STORAGE_CALL_COUNT"), SKIP_COUNT("STORAGE_COUNT_SKIP"), SCAN_COUNT("STORAGE_COUNT_SCAN"), RETURN_COUNT( - "STORAGE_COUNT_RETURN"), AGGR_FILTER_COUNT( - "STORAGE_COUNT_AGGREGATE_FILTER"), AGGR_COUNT("STORAGE_COUNT_AGGREGATE"); + + PROJECT("PROJECT"), + CUBE("CUBE_NAME"), + SEGMENT("SEGMENT_NAME"), + CUBOID_SOURCE("CUBOID_SOURCE"), + CUBOID_TARGET("CUBOID_TARGET"), + IF_MATCH("IF_MATCH"), + FILTER_MASK("FILTER_MASK"), + IF_SUCCESS("IF_SUCCESS"), + + TIME_SUM("STORAGE_CALL_TIME_SUM"), + TIME_MAX("STORAGE_CALL_TIME_MAX"), + WEIGHT_PER_HIT("WEIGHT_PER_HIT"), + CALL_COUNT("STORAGE_CALL_COUNT"), + SKIP_COUNT("STORAGE_COUNT_SKIP"), + SCAN_COUNT("STORAGE_COUNT_SCAN"), + RETURN_COUNT("STORAGE_COUNT_RETURN"), + AGGR_FILTER_COUNT("STORAGE_COUNT_AGGREGATE_FILTER"), + AGGR_COUNT("STORAGE_COUNT_AGGREGATE"); private final String propertyName; diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryPropertyEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryPropertyEnum.java index 20da4ce..e9c51df 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryPropertyEnum.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryPropertyEnum.java @@ -22,11 +22,23 @@ import java.util.Locale; import com.google.common.base.Strings; +/** + * Definition of Metrics dimension and measure for Query Basic + */ public enum QueryPropertyEnum { - ID_CODE("QUERY_HASH_CODE"), TYPE("QUERY_TYPE"), USER("KUSER"), PROJECT("PROJECT"), REALIZATION( - "REALIZATION"), REALIZATION_TYPE("REALIZATION_TYPE"), EXCEPTION("EXCEPTION"), // - TIME_COST("QUERY_TIME_COST"), CALCITE_RETURN_COUNT("CALCITE_COUNT_RETURN"), STORAGE_RETURN_COUNT( - "STORAGE_COUNT_RETURN"), AGGR_FILTER_COUNT("CALCITE_COUNT_AGGREGATE_FILTER"); + + ID_CODE("QUERY_HASH_CODE"), + TYPE("QUERY_TYPE"), + USER("KUSER"), + PROJECT("PROJECT"), + REALIZATION("REALIZATION"), + REALIZATION_TYPE("REALIZATION_TYPE"), + EXCEPTION("EXCEPTION"), + + TIME_COST("QUERY_TIME_COST"), + CALCITE_RETURN_COUNT("CALCITE_COUNT_RETURN"), + STORAGE_RETURN_COUNT("STORAGE_COUNT_RETURN"), + AGGR_FILTER_COUNT("CALCITE_COUNT_AGGREGATE_FILTER"); private final String propertyName; diff --git a/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryRPCPropertyEnum.java b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryRPCPropertyEnum.java index 4366f0d..ba560b0 100644 --- a/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryRPCPropertyEnum.java +++ b/core-metrics/src/main/java/org/apache/kylin/metrics/property/QueryRPCPropertyEnum.java @@ -18,14 +18,24 @@ package org.apache.kylin.metrics.property; -import java.util.Locale; - import com.google.common.base.Strings; +/** + * Definition of Metrics dimension and measure for HBase RPC + */ public enum QueryRPCPropertyEnum { - PROJECT("PROJECT"), REALIZATION("REALIZATION"), RPC_SERVER("RPC_SERVER"), EXCEPTION("EXCEPTION"), // - CALL_TIME("CALL_TIME"), SKIP_COUNT("COUNT_SKIP"), SCAN_COUNT("COUNT_SCAN"), RETURN_COUNT( - "COUNT_RETURN"), AGGR_FILTER_COUNT("COUNT_AGGREGATE_FILTER"), AGGR_COUNT("COUNT_AGGREGATE"); + + PROJECT("PROJECT"), + REALIZATION("REALIZATION"), + RPC_SERVER("RPC_SERVER"), + EXCEPTION("EXCEPTION"), + + CALL_TIME("CALL_TIME"), + SKIP_COUNT("COUNT_SKIP"), + SCAN_COUNT("COUNT_SCAN"), + RETURN_COUNT("COUNT_RETURN"), + AGGR_FILTER_COUNT("COUNT_AGGREGATE_FILTER"), + AGGR_COUNT("COUNT_AGGREGATE"); private final String propertyName; @@ -38,7 +48,7 @@ public enum QueryRPCPropertyEnum { return null; } for (QueryRPCPropertyEnum property : QueryRPCPropertyEnum.values()) { - if (property.propertyName.equals(name.toUpperCase(Locale.ROOT))) { + if (property.propertyName.equalsIgnoreCase(name)) { return property; } } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index 0e78e9c..c566a13 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -49,7 +49,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { } public CubingJob build() { - logger.info("MR_V2 new job to BUILD segment " + seg); + logger.info("MR_V2 new job to BUILD segment {}", seg); final CubingJob result = CubingJob.createBuildJob(seg, submitter, config); final String jobId = result.getId(); diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java index ba1f151..74e389e 100644 --- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java +++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducer.java @@ -55,12 +55,11 @@ import com.google.common.collect.Maps; public class HiveProducer { private static final Logger logger = LoggerFactory.getLogger(HiveProducer.class); - private static final int CACHE_MAX_SIZE = 10; private final HiveConf hiveConf; private final FileSystem fs; private final LoadingCache<Pair<String, String>, Pair<String, List<FieldSchema>>> tableFieldSchemaCache; - private final String CONTENT_FILE_PREFIX; + private final String contentFilePrefix; private String metricType; private String prePartitionPath; private Path curPartitionContentPath; @@ -99,7 +98,7 @@ public class HiveProducer { List<FieldSchema> fields = metaStoreClient.getFields(tableName.getFirst(), tableName.getSecond()); metaStoreClient.close(); - return new Pair(tableLocation, fields); + return new Pair<>(tableLocation, fields); } }); @@ -109,7 +108,7 @@ public class HiveProducer { } catch (UnknownHostException e) { hostName = "UNKNOWN"; } - CONTENT_FILE_PREFIX = hostName + "-" + System.currentTimeMillis() + "-part-"; + contentFilePrefix = hostName + "-" + System.currentTimeMillis() + "-part-"; } public void close() { @@ -137,6 +136,8 @@ public class HiveProducer { } private void write(RecordKey recordKey, Iterable<HiveProducerRecord> recordItr) throws Exception { + + // Step 1: determine partitionPath by record 's RecordKey String tableLocation = tableFieldSchemaCache.get(new Pair<>(recordKey.database(), recordKey.table())) .getFirst(); StringBuilder sb = new StringBuilder(); @@ -148,6 +149,8 @@ public class HiveProducer { sb.append(e.getValue()); } Path partitionPath = new Path(sb.toString()); + + // Step 2: create partition for hive table if not exists if (!fs.exists(partitionPath)) { StringBuilder hql = new StringBuilder(); hql.append("ALTER TABLE "); @@ -164,21 +167,22 @@ public class HiveProducer { hql.append("='" + e.getValue() + "'"); } hql.append(")"); + logger.debug("create partition by {}.", hql); Driver driver = new Driver(hiveConf); SessionState.start(new CliSessionState(hiveConf)); driver.run(hql.toString()); driver.close(); } + // Step 3: create path for new partition if it is the first time write metrics message or new partition should be used if (fout == null || prePartitionPath == null || prePartitionPath.compareTo(partitionPath.toString()) != 0) { if (fout != null) { + logger.debug("Flush output stream of previous partition path {}. Using a new one {}. ", prePartitionPath, partitionPath); closeFout(); } - Path partitionContentPath = new Path(partitionPath, - CONTENT_FILE_PREFIX + String.format(Locale.ROOT, "%04d", id)); - logger.info("Try to use new partition content path: " + partitionContentPath.toString() + " for metric: " - + metricType); + Path partitionContentPath = new Path(partitionPath, contentFilePrefix + String.format(Locale.ROOT, "%04d", id)); + logger.info("Try to use new partition content path: {} for metric: {}", partitionContentPath, metricType); if (!fs.exists(partitionContentPath)) { int nRetry = 0; while (!fs.createNewFile(partitionContentPath) && nRetry++ < 5) { @@ -188,7 +192,7 @@ public class HiveProducer { Thread.sleep(500L * nRetry); } if (!fs.exists(partitionContentPath)) { - throw new RuntimeException( + throw new IllegalStateException( "Fail to create HDFS file: " + partitionContentPath + " after " + nRetry + " retries"); } } @@ -198,14 +202,14 @@ public class HiveProducer { id = (id + 1) % 10; } + // Step 4: append record to HDFS without flush try { int count = 0; for (HiveProducerRecord elem : recordItr) { fout.writeBytes(elem.valueToString() + "\n"); count++; } - logger.info("Success to write " + count + " metrics(" + metricType + ") to file " - + curPartitionContentPath.toString()); + logger.info("Success to write {} metrics ({}) to file {}", count, metricType, curPartitionContentPath); } catch (IOException e) { logger.error("Fails to write metrics(" + metricType + ") to file " + curPartitionContentPath.toString() + " due to ", e); @@ -256,5 +260,4 @@ public class HiveProducer { return new HiveProducerRecord(tableNameSplits.getFirst(), tableNameSplits.getSecond(), partitionKVs, columnValues); } - } diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java index 8bf93ec..e1fb782 100644 --- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java +++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveProducerRecord.java @@ -106,6 +106,9 @@ public class HiveProducerRecord { return result; } + /** + * Use to organize metrics message + */ public class RecordKey { public static final String DEFAULT_DB_NAME = "DEFAULT"; diff --git a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java index 48d3e31..8e8f3b6 100644 --- a/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java +++ b/metrics-reporter-hive/src/main/java/org/apache/kylin/metrics/lib/impl/hive/HiveReservoirReporter.java @@ -109,7 +109,7 @@ public class HiveReservoirReporter extends ActiveReservoirReporter { private class HiveReservoirListener implements ActiveReservoirListener { private Properties props; - private Map<String, HiveProducer> producerMap = new HashMap<String, HiveProducer>(); + private Map<String, HiveProducer> producerMap = new HashMap<>(); private HiveReservoirListener(Properties props) throws Exception { this.props = props; @@ -125,10 +125,10 @@ public class HiveReservoirReporter extends ActiveReservoirReporter { } public boolean onRecordUpdate(final List<Record> records) { - if (records.size() == 0) { + if (records.isEmpty()) { return true; } - logger.info("Try to write " + records.size() + " records"); + logger.info("Try to write {} records", records.size()); try { Map<String, List<Record>> queues = new HashMap<>(); for (Record record : records) { diff --git a/server/src/main/resources/kylinMetrics.xml b/server/src/main/resources/kylinMetrics.xml index 7aa3eb3..843fb91 100644 --- a/server/src/main/resources/kylinMetrics.xml +++ b/server/src/main/resources/kylinMetrics.xml @@ -17,17 +17,24 @@ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd"> - <description>Kylin Metrics Related Configuration</description> + <description>Kylin Metrics Related Configuration (SystemCube)</description> + <!-- A Reservoir which don't staged metrics message at all, emit it in no time. Maybe good for debug purpose.--> <bean id="instantReservoir" class="org.apache.kylin.metrics.lib.impl.InstantReservoir"/> + <!-- A Reservoir which staged metrics message in memory, and emit them in fixed rate. --> <bean id="blockingReservoir" class="org.apache.kylin.metrics.lib.impl.BlockingReservoir"> + <!-- minReportSize, only if currently count of staged message exceed minReportSize, will Reservoir try to write message--> <constructor-arg index="0"> <value>100</value> </constructor-arg> + + <!-- maxReportSize, max size of report in one time --> <constructor-arg index="1"> <value>500</value> </constructor-arg> + + <!-- minReportTime, min duration(in minute) between two report action--> <constructor-arg index="2"> <value>10</value> </constructor-arg> diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java index 0b06849..911d32e 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java @@ -91,8 +91,7 @@ public class HiveInputBase { // create global dict KylinConfig dictConfig = (flatDesc.getSegment()).getConfig(); String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumns(); - if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0 - && !"".equals(mrHiveDictColumns[0])) { + if (mrHiveDictColumns.length > 0) { String globalDictDatabase = dictConfig.getMrHiveDictDB(); if (null == globalDictDatabase) { throw new IllegalArgumentException("Mr-Hive Global dict database is null.");