[1/2] kylin git commit: KYLIN-1826, add external hive interface, project, table.. Signed-off-by: terry-chelsea <hzfen...@corp.netease.com> [Forced Update!]
Repository: kylin Updated Branches: refs/heads/KYLIN-1826 726fb8fd6 -> 417fed32f (forced update) http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveMRInput.java -- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveMRInput.java new file mode 100644 index 000..ef7b54d --- /dev/null +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/external/ExternalHiveMRInput.java @@ -0,0 +1,200 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ +package org.apache.kylin.source.hive.external; + +import java.io.IOException; + +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.job.JoinedFlatTable; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.AbstractExecutable; +import org.apache.kylin.job.execution.DefaultChainedExecutable; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; +import org.apache.kylin.source.hive.CreateFlatHiveTableStep; +import org.apache.kylin.source.hive.HiveCmdBuilder; +import org.apache.kylin.source.hive.HiveMRInput; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * External hive file mr input + * @author hzfengyu + */ +public class ExternalHiveMRInput extends HiveMRInput { +@Override +public IMRBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) { +return new BatchFileCubingInputSide(flatDesc); +} + +public static class BatchFileCubingInputSide extends BatchCubingInputSide { +private static final Logger logger = LoggerFactory.getLogger(BatchFileCubingInputSide.class); + +public BatchFileCubingInputSide(IJoinedFlatTableDesc flatDesc) { +super(flatDesc); +} + +@Override +public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow) { +super.addStepPhase1_CreateFlatTable(jobFlow); +final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); + +/** + * Create table as flat hive table in default hive the same as external hive table, use it in next steps. + * Do not create view in default table. because kylin read lookup table file. + */ +jobFlow.addTask(createFlatTableInDefaultHive(conf, flatDesc, jobFlow.getId(), cubeName)); +AbstractExecutable copyDataStep = createCopyHiveDataStep(flatDesc.getTableName(), flatDesc.getHiveName(), +JoinedFlatTable.getTableDir(flatDesc, JobBuilderSupport.getJobWorkingDir(conf, jobFlow.getId(; +if(copyDataStep != null) { +jobFlow.addTask(copyDataStep); +} +} + +@Override +protected String getRowCountOutputDir(JobEngineConfig conf, String jobId) { +String tempDir = System.getProperty("java.io.tmpdir", "/tmp"); +return String.format("file://%s/kylin-%s/%s", tempDir, jobId, "/row_count"); +} + +@Override +protected boolean isWriteToLocalDir() { +return true; +} + +protected AbstractExecutable createCopyHiveDataStep(String flatHiveTableName, String hiveName, String output) { +DistcpShellExecutable copyHiveTableSetp = new DistcpShellExecutable(); + copyHiveTableSetp.setName(ExecutableConstants.STEP_NAME_COPY_HIVE_DATA); +copyHiveTableSetp.setHiveName(hiveName); +
[2/2] kylin git commit: KYLIN-1826, add external hive interface, project, table.. Signed-off-by: terry-chelsea <hzfen...@corp.netease.com>
KYLIN-1826, add external hive interface, project, table.. Signed-off-by: terry-chelseaProject: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/417fed32 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/417fed32 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/417fed32 Branch: refs/heads/KYLIN-1826 Commit: 417fed32f7abbea459906f5be148c974659effd4 Parents: b08871e Author: terry-chelsea Authored: Fri Oct 28 20:38:35 2016 +0800 Committer: shaofengshi Committed: Wed Nov 9 14:11:46 2016 +0800 -- .../apache/kylin/common/KylinConfigBase.java| 5 + .../cube/model/CubeJoinedFlatTableDesc.java | 5 + .../cube/model/CubeJoinedFlatTableEnrich.java | 4 + .../org/apache/kylin/job/JoinedFlatTable.java | 10 +- .../kylin/job/constant/ExecutableConstants.java | 2 + .../metadata/model/IJoinedFlatTableDesc.java| 2 + .../kylin/metadata/model/ISourceAware.java | 1 + .../apache/kylin/metadata/model/TableDesc.java | 15 +- .../kylin/metadata/project/ProjectInstance.java | 11 + .../kylin/metadata/project/ProjectManager.java | 22 +- .../hive/ITHiveSourceTableLoaderTest.java | 3 +- .../source/hive/ITHiveTableReaderTest.java | 2 +- .../kylin/rest/controller/CubeController.java | 17 ++ .../rest/controller/ProjectController.java | 35 +++- .../rest/controller/StreamingController.java| 4 + .../kylin/rest/controller/TableController.java | 24 ++- .../rest/request/CreateProjectRequest.java | 12 ++ .../rest/request/UpdateProjectRequest.java | 13 ++ .../kylin/rest/response/TableDescResponse.java | 1 + .../apache/kylin/rest/service/BasicService.java | 14 +- .../apache/kylin/rest/service/CacheService.java | 4 +- .../apache/kylin/rest/service/CubeService.java | 10 +- .../kylin/rest/service/ProjectService.java | 6 +- .../source/hive/CreateFlatHiveTableStep.java| 11 +- .../apache/kylin/source/hive/HiveClient.java| 2 +- .../kylin/source/hive/HiveCmdBuilder.java | 13 +- .../apache/kylin/source/hive/HiveMRInput.java | 87 +--- .../source/hive/HiveSourceTableLoader.java | 22 +- .../org/apache/kylin/source/hive/HiveTable.java | 7 +- .../kylin/source/hive/HiveTableReader.java | 16 +- .../apache/kylin/source/hive/HqlExecutable.java | 12 +- .../hive/external/DistcpShellExecutable.java| 100 + .../hive/external/ExternalHiveClient.java | 80 +++ .../hive/external/ExternalHiveMRInput.java | 200 ++ .../hive/external/ExternalHiveSource.java | 51 + .../kylin/source/hive/external/HiveManager.java | 206 +++ webapp/app/js/controllers/page.js | 1 + webapp/app/js/controllers/sourceMeta.js | 4 +- webapp/app/js/model/projectConfig.js| 1 + .../app/partials/projects/project_create.html | 9 + webapp/app/partials/projects/projects.html | 1 + 41 files changed, 975 insertions(+), 70 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java -- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 79ee084..39c4a3e 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -365,6 +365,10 @@ abstract public class KylinConfigBase implements Serializable { public String getCliWorkingDir() { return getOptional("kylin.job.remote.cli.working.dir"); } + +public String getExternalHiveRootDirectory() { +return getOptional("kylin.external.hive.root.directory", null); +} public boolean isEmptySegmentAllowed() { return Boolean.parseBoolean(getOptional("kylin.job.allow.empty.segment", "true")); @@ -717,6 +721,7 @@ abstract public class KylinConfigBase implements Serializable { Map r = convertKeyToInteger(getPropertiesByPrefix("kylin.source.engine.")); // ref constants in ISourceAware r.put(0, "org.apache.kylin.source.hive.HiveSource"); +r.put(6, "org.apache.kylin.source.hive.external.ExternalHiveSource"); return r; } http://git-wip-us.apache.org/repos/asf/kylin/blob/417fed32/core-cube/src/main/java/org/apache/kylin/cube/model/CubeJoinedFlatTableDesc.java -- diff --git
kylin git commit: KYLIN-1726 KafkaSource may get wrong start offset when partition added.
Repository: kylin Updated Branches: refs/heads/master c8efa5483 -> 309593b40 KYLIN-1726 KafkaSource may get wrong start offset when partition added. Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/309593b4 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/309593b4 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/309593b4 Branch: refs/heads/master Commit: 309593b4089a1868a92b7a8c339d2fc63911bfe7 Parents: c8efa54 Author: shaofengshiAuthored: Wed Nov 9 09:28:09 2016 +0800 Committer: shaofengshi Committed: Wed Nov 9 13:38:30 2016 +0800 -- .../apache/kylin/source/SourcePartition.java| 12 +-- .../apache/kylin/source/kafka/KafkaSource.java | 38 .../kylin/source/kafka/util/KafkaClient.java| 5 +-- 3 files changed, 44 insertions(+), 11 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/309593b4/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java -- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java index 8ba749d..cf89580 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java @@ -18,8 +18,11 @@ package org.apache.kylin.source; +import java.util.HashMap; import java.util.Map; +import com.google.common.base.Objects; + /** */ public class SourcePartition { @@ -90,14 +93,19 @@ public class SourcePartition { this.sourcePartitionOffsetEnd = sourcePartitionOffsetEnd; } +@Override +public String toString() { +return Objects.toStringHelper(this).add("startDate", startDate).add("endDate", endDate).add("startOffset", startOffset).add("endOffset", endOffset).add("sourcePartitionOffsetStart", sourcePartitionOffsetStart.toString()).add("sourcePartitionOffsetEnd", sourcePartitionOffsetEnd.toString()).toString(); +} + public static SourcePartition getCopyOf(SourcePartition origin) { SourcePartition copy = new SourcePartition(); copy.setStartDate(origin.getStartDate()); copy.setEndDate(origin.getEndDate()); copy.setStartOffset(origin.getStartOffset()); copy.setEndOffset(origin.getEndOffset()); - copy.setSourcePartitionOffsetStart(origin.getSourcePartitionOffsetStart()); -copy.setSourcePartitionOffsetEnd(origin.getSourcePartitionOffsetEnd()); +copy.setSourcePartitionOffsetStart(new HashMap<>(origin.getSourcePartitionOffsetStart())); +copy.setSourcePartitionOffsetEnd(new HashMap<>(origin.getSourcePartitionOffsetEnd())); return copy; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/309593b4/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java -- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index 7a5d94f..b0c8e7f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.IMRInput; @@ -34,12 +35,16 @@ import org.apache.kylin.source.ReadableTable; import org.apache.kylin.source.SourcePartition; import org.apache.kylin.source.kafka.config.KafkaConfig; import org.apache.kylin.source.kafka.util.KafkaClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.common.collect.Lists; //used by reflection public class KafkaSource implements ISource { +private static final Logger logger = LoggerFactory.getLogger(KafkaSource.class); + @SuppressWarnings("unchecked") @Override public I adaptToBuildEngine(Class engineInterface) { @@ -71,32 +76,50 @@ public class KafkaSource implements ISource { if (result.getStartOffset() == 0) { final CubeSegment last = cube.getLastSegment(); if (last != null) { +logger.debug("Last segment exists, continue from last segment " + last.getName() + "'s end position: " + last.getSourcePartitionOffsetEnd()); // from last seg's end position
[kylin] Git Push Summary
Repository: kylin Updated Branches: refs/heads/KYLIN-2122 [deleted] 2beccbf94
kylin git commit: Add more checks
Repository: kylin Updated Branches: refs/heads/v1.6.0-rc1 1ffbe19aa -> 710232e14 Add more checks Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/710232e1 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/710232e1 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/710232e1 Branch: refs/heads/v1.6.0-rc1 Commit: 710232e14d461334a42f1a957c291668d661a135 Parents: 1ffbe19 Author: shaofengshiAuthored: Wed Nov 9 11:14:05 2016 +0800 Committer: shaofengshi Committed: Wed Nov 9 11:14:05 2016 +0800 -- .../java/org/apache/kylin/source/kafka/KafkaSource.java | 10 ++ 1 file changed, 10 insertions(+) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/710232e1/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java -- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index a919043..b0c8e7f 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -109,6 +109,16 @@ public class KafkaSource implements ISource { logger.debug("Seek end offsets from topic"); Map latestOffsets = KafkaClient.getCurrentOffsets(cube); logger.debug("The end offsets are " + latestOffsets); + +for (Integer partitionId : latestOffsets.keySet()) { +if (result.getSourcePartitionOffsetStart().containsKey(partitionId)) { +if (result.getSourcePartitionOffsetStart().get(partitionId) > latestOffsets.get(partitionId)) { +throw new IllegalArgumentException("Partition " + partitionId + " end offset (" + latestOffsets.get(partitionId) + ") is smaller than start offset ( " + result.getSourcePartitionOffsetStart().get(partitionId) + ")"); +} +} else { +throw new IllegalStateException("New partition added in between, retry."); +} +} result.setSourcePartitionOffsetEnd(latestOffsets); }
kylin git commit: more logs
Repository: kylin Updated Branches: refs/heads/v1.6.0-rc1 0425228df -> 39b7dad41 more logs Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/39b7dad4 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/39b7dad4 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/39b7dad4 Branch: refs/heads/v1.6.0-rc1 Commit: 39b7dad41e2764def1385f3f4b5303ca7f940b10 Parents: 0425228 Author: shaofengshiAuthored: Wed Nov 9 10:25:58 2016 +0800 Committer: shaofengshi Committed: Wed Nov 9 10:25:58 2016 +0800 -- .../apache/kylin/source/SourcePartition.java| 12 ++-- .../apache/kylin/source/kafka/KafkaSource.java | 29 ++-- 2 files changed, 24 insertions(+), 17 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/39b7dad4/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java -- diff --git a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java index 8ba749d..161977a 100644 --- a/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java +++ b/core-metadata/src/main/java/org/apache/kylin/source/SourcePartition.java @@ -18,8 +18,11 @@ package org.apache.kylin.source; +import java.util.HashMap; import java.util.Map; +import com.google.common.base.Objects; + /** */ public class SourcePartition { @@ -90,14 +93,19 @@ public class SourcePartition { this.sourcePartitionOffsetEnd = sourcePartitionOffsetEnd; } +@Override +public String toString() { +return Objects.toStringHelper(this).add("startDate", startDate).add("endDate", endDate).add("startOffset", startOffset).add("endOffset", endOffset).add("sourcePartitionOffsetStart", sourcePartitionOffsetStart.toString()).add("sourcePartitionOffsetEnd", sourcePartitionOffsetEnd.toString()).toString(); +} + public static SourcePartition getCopyOf(SourcePartition origin) { SourcePartition copy = new SourcePartition(); copy.setStartDate(origin.getStartDate()); copy.setEndDate(origin.getEndDate()); copy.setStartOffset(origin.getStartOffset()); copy.setEndOffset(origin.getEndOffset()); - copy.setSourcePartitionOffsetStart(origin.getSourcePartitionOffsetStart()); -copy.setSourcePartitionOffsetEnd(origin.getSourcePartitionOffsetEnd()); +copy.setSourcePartitionOffsetStart(new HashMap<>(origin.getSourcePartitionOffsetStart())); +copy.setSourcePartitionOffsetEnd(new HashMap<>origin.getSourcePartitionOffsetEnd()); return copy; } } http://git-wip-us.apache.org/repos/asf/kylin/blob/39b7dad4/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java -- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index b0434f0..a919043 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -35,11 +35,11 @@ import org.apache.kylin.source.ReadableTable; import org.apache.kylin.source.SourcePartition; import org.apache.kylin.source.kafka.config.KafkaConfig; import org.apache.kylin.source.kafka.util.KafkaClient; - -import com.google.common.collect.Lists; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.common.collect.Lists; + //used by reflection public class KafkaSource implements ISource { @@ -94,23 +94,22 @@ public class KafkaSource implements ISource { final String topic = kafakaConfig.getTopic(); try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) { final List partitionInfos = consumer.partitionsFor(topic); -if (partitionInfos.size() > result.getSourcePartitionOffsetStart().size()) { -// has new partition added -logger.debug("has new partition added"); -for (PartitionInfo partitionInfo : partitionInfos) { -if (result.getSourcePartitionOffsetStart().containsKey(partitionInfo.partition()) == false) { -long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfo.partition()); -logger.debug("new partition " + partitionInfo.partition() + " starts from " + earliest); - result.getSourcePartitionOffsetStart().put(partitionInfo.partition(), earliest); -} +
kylin git commit: add debug message
Repository: kylin Updated Branches: refs/heads/v1.6.0-rc1 60cd1acbc -> 0425228df add debug message Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0425228d Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0425228d Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0425228d Branch: refs/heads/v1.6.0-rc1 Commit: 0425228df531ac120a6bd642de5ddeddbde36519 Parents: 60cd1ac Author: shaofengshiAuthored: Wed Nov 9 09:51:35 2016 +0800 Committer: shaofengshi Committed: Wed Nov 9 09:51:35 2016 +0800 -- .../java/org/apache/kylin/source/kafka/KafkaSource.java | 12 1 file changed, 12 insertions(+) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/0425228d/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java -- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index e736ad1..b0434f0 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -37,10 +37,14 @@ import org.apache.kylin.source.kafka.config.KafkaConfig; import org.apache.kylin.source.kafka.util.KafkaClient; import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; //used by reflection public class KafkaSource implements ISource { +private static final Logger logger = LoggerFactory.getLogger(KafkaSource.class); + @SuppressWarnings("unchecked") @Override public I adaptToBuildEngine(Class engineInterface) { @@ -72,12 +76,15 @@ public class KafkaSource implements ISource { if (result.getStartOffset() == 0) { final CubeSegment last = cube.getLastSegment(); if (last != null) { +logger.debug("Last segment exists, continue from last segment " + last.getName() + "'s end position: " + last.getSourcePartitionOffsetEnd()); // from last seg's end position result.setSourcePartitionOffsetStart(last.getSourcePartitionOffsetEnd()); } else if (cube.getDescriptor().getPartitionOffsetStart() != null && cube.getDescriptor().getPartitionOffsetStart().size() > 0) { +logger.debug("Last segment doesn't exist, use the start offset that be initiated previously: " + cube.getDescriptor().getPartitionOffsetStart()); result.setSourcePartitionOffsetStart(cube.getDescriptor().getPartitionOffsetStart()); } else { // from the topic's very begining; +logger.debug("Last segment doesn't exist, and didn't initiate the start offset, will seek from topic's very beginning."); result.setSourcePartitionOffsetStart(KafkaClient.getEarliestOffsets(cube)); } } @@ -89,12 +96,16 @@ public class KafkaSource implements ISource { final List partitionInfos = consumer.partitionsFor(topic); if (partitionInfos.size() > result.getSourcePartitionOffsetStart().size()) { // has new partition added +logger.debug("has new partition added"); for (PartitionInfo partitionInfo : partitionInfos) { if (result.getSourcePartitionOffsetStart().containsKey(partitionInfo.partition()) == false) { long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfo.partition()); +logger.debug("new partition " + partitionInfo.partition() + " starts from " + earliest); result.getSourcePartitionOffsetStart().put(partitionInfo.partition(), earliest); } } +} else { +logger.debug("no new partition"); } } @@ -121,6 +132,7 @@ public class KafkaSource implements ISource { result.setStartOffset(totalStartOffset); result.setEndOffset(totalEndOffset); +logger.debug("KafkaSource.parsePartitionBeforeBuild, return " + result); return result; }
kylin git commit: KYLIN-1726 KafkaSource may get wrong start offset when partition added.
Repository: kylin Updated Branches: refs/heads/v1.6.0-rc1 5d166aa8e -> 60cd1acbc KYLIN-1726 KafkaSource may get wrong start offset when partition added. Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/60cd1acb Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/60cd1acb Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/60cd1acb Branch: refs/heads/v1.6.0-rc1 Commit: 60cd1acbca9c84f4ab69931d781c81e1b017eb56 Parents: 5d166aa Author: shaofengshiAuthored: Wed Nov 9 09:28:09 2016 +0800 Committer: shaofengshi Committed: Wed Nov 9 09:28:09 2016 +0800 -- .../java/org/apache/kylin/source/kafka/KafkaSource.java | 11 +++ .../org/apache/kylin/source/kafka/util/KafkaClient.java | 5 +++-- 2 files changed, 10 insertions(+), 6 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/60cd1acb/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java -- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java index 7a5d94f..e736ad1 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSource.java @@ -23,6 +23,7 @@ import java.util.Map; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.common.PartitionInfo; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.IMRInput; @@ -81,16 +82,18 @@ public class KafkaSource implements ISource { } } -final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(cube.getConfig()).getKafkaConfig(cube.getFactTable()); +final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(KylinConfig.getInstanceFromEnv()).getKafkaConfig(cube.getFactTable()); final String brokers = KafkaClient.getKafkaBrokers(kafakaConfig); final String topic = kafakaConfig.getTopic(); try (final KafkaConsumer consumer = KafkaClient.getKafkaConsumer(brokers, cube.getName(), null)) { final List partitionInfos = consumer.partitionsFor(topic); if (partitionInfos.size() > result.getSourcePartitionOffsetStart().size()) { // has new partition added -for (int x = result.getSourcePartitionOffsetStart().size(); x < partitionInfos.size(); x++) { -long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfos.get(x).partition()); - result.getSourcePartitionOffsetStart().put(partitionInfos.get(x).partition(), earliest); +for (PartitionInfo partitionInfo : partitionInfos) { +if (result.getSourcePartitionOffsetStart().containsKey(partitionInfo.partition()) == false) { +long earliest = KafkaClient.getEarliestOffset(consumer, topic, partitionInfo.partition()); + result.getSourcePartitionOffsetStart().put(partitionInfo.partition(), earliest); +} } } } http://git-wip-us.apache.org/repos/asf/kylin/blob/60cd1acb/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java -- diff --git a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java index a0bbd22..446c076 100644 --- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java +++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/util/KafkaClient.java @@ -22,6 +22,7 @@ import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.common.PartitionInfo; import org.apache.kafka.common.TopicPartition; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.source.kafka.KafkaConfigManager; import org.apache.kylin.source.kafka.config.BrokerConfig; @@ -117,7 +118,7 @@ public class KafkaClient { } public static Map getCurrentOffsets(final CubeInstance cubeInstance) { -final KafkaConfig kafakaConfig = KafkaConfigManager.getInstance(cubeInstance.getConfig()).getKafkaConfig(cubeInstance.getFactTable()); +final KafkaConfig kafakaConfig =
[2/3] kylin git commit: KYLIN-1528 Create a branch for v1.5 with HBase 1.x API
KYLIN-1528 Create a branch for v1.5 with HBase 1.x API Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0d3af7e0 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0d3af7e0 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0d3af7e0 Branch: refs/heads/yang21-cdh5.7 Commit: 0d3af7e005ee0ee6bdb6917715997b2e8f585623 Parents: 3b07c26 Author: shaofengshiAuthored: Wed Mar 23 17:07:05 2016 +0800 Committer: Hongbin Ma Committed: Wed Nov 9 08:15:48 2016 +0800 -- examples/test_case_data/sandbox/hbase-site.xml | 19 +--- .../kylin/provision/BuildCubeWithEngine.java| 13 ++- pom.xml | 12 +-- .../kylin/rest/security/AclHBaseStorage.java| 4 +- .../rest/security/MockAclHBaseStorage.java | 8 +- .../apache/kylin/rest/security/MockHTable.java | 95 .../rest/security/RealAclHBaseStorage.java | 9 +- .../apache/kylin/rest/service/AclService.java | 25 +++--- .../apache/kylin/rest/service/CubeService.java | 36 +++- .../apache/kylin/rest/service/QueryService.java | 24 +++-- .../apache/kylin/rest/service/UserService.java | 17 ++-- .../kylin/storage/hbase/HBaseConnection.java| 44 - .../kylin/storage/hbase/HBaseResourceStore.java | 31 +++ .../kylin/storage/hbase/HBaseStorage.java | 3 +- .../storage/hbase/cube/SimpleHBaseStore.java| 20 ++--- .../hbase/cube/v1/CubeSegmentTupleIterator.java | 11 +-- .../storage/hbase/cube/v1/CubeStorageQuery.java | 6 +- .../hbase/cube/v1/RegionScannerAdapter.java | 10 ++- .../cube/v1/SerializedHBaseTupleIterator.java | 4 +- .../observer/AggregateRegionObserver.java | 4 +- .../observer/AggregationScanner.java| 14 ++- .../observer/ObserverAggregationCache.java | 10 ++- .../coprocessor/observer/ObserverEnabler.java | 4 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 13 +-- .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 9 +- .../coprocessor/endpoint/CubeVisitService.java | 4 +- .../storage/hbase/steps/CubeHTableUtil.java | 16 ++-- .../storage/hbase/steps/DeprecatedGCStep.java | 23 ++--- .../storage/hbase/steps/HBaseCuboidWriter.java | 7 +- .../hbase/steps/HBaseStreamingOutput.java | 9 +- .../kylin/storage/hbase/steps/MergeGCStep.java | 23 ++--- .../storage/hbase/util/CleanHtableCLI.java | 12 +-- .../storage/hbase/util/CubeMigrationCLI.java| 36 .../hbase/util/CubeMigrationCheckCLI.java | 17 ++-- .../hbase/util/DeployCoprocessorCLI.java| 22 ++--- .../hbase/util/ExtendCubeToHybridCLI.java | 8 +- .../hbase/util/GridTableHBaseBenchmark.java | 34 +++ .../kylin/storage/hbase/util/HBaseClean.java| 18 ++-- .../hbase/util/HBaseRegionSizeCalculator.java | 35 .../kylin/storage/hbase/util/HBaseUsage.java| 9 +- .../storage/hbase/util/HbaseStreamingInput.java | 30 +++ .../hbase/util/HtableAlterMetadataCLI.java | 9 +- .../storage/hbase/util/OrphanHBaseCleanJob.java | 19 ++-- .../kylin/storage/hbase/util/PingHBaseCLI.java | 15 ++-- .../kylin/storage/hbase/util/RowCounterCLI.java | 11 +-- .../storage/hbase/util/StorageCleanupJob.java | 20 +++-- .../storage/hbase/util/UpdateHTableHostCLI.java | 17 ++-- .../observer/AggregateRegionObserverTest.java | 26 ++ .../v1/filter/TestFuzzyRowFilterV2EndToEnd.java | 5 +- .../org/apache/kylin/tool/CubeMigrationCLI.java | 36 .../kylin/tool/CubeMigrationCheckCLI.java | 16 ++-- .../kylin/tool/ExtendCubeToHybridCLI.java | 8 +- .../apache/kylin/tool/StorageCleanupJob.java| 20 +++-- 53 files changed, 450 insertions(+), 500 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/0d3af7e0/examples/test_case_data/sandbox/hbase-site.xml -- diff --git a/examples/test_case_data/sandbox/hbase-site.xml b/examples/test_case_data/sandbox/hbase-site.xml index 46d5345..734908e 100644 --- a/examples/test_case_data/sandbox/hbase-site.xml +++ b/examples/test_case_data/sandbox/hbase-site.xml @@ -190,22 +190,5 @@ zookeeper.znode.parent /hbase-unsecure - -hbase.client.pause -100 -General client pause value. Used mostly as value to wait -before running a retry of a failed get, region lookup, etc. -See hbase.client.retries.number for description of how we backoff from -this initial pause amount and how this pause works w/ retries. - - -hbase.client.retries.number -5 -Maximum retries. Used as maximum for all retryable -operations such as the getting of a cell's value, starting a row update, -
[1/3] kylin git commit: KYLIN-1528 Create a branch for v1.5 with HBase 1.x API [Forced Update!]
Repository: kylin Updated Branches: refs/heads/yang21-cdh5.7 f58ece862 -> 6d6486f30 (forced update) http://git-wip-us.apache.org/repos/asf/kylin/blob/0d3af7e0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java -- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java index 0e95102..c59fb33 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java @@ -27,7 +27,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.cube.CubeSegment; @@ -56,7 +57,7 @@ public class HBaseStreamingOutput implements IStreamingOutput { try { CubeSegment cubeSegment = (CubeSegment) buildable; -final HTableInterface hTable; +final Table hTable; hTable = createHTable(cubeSegment); List cuboidWriters = Lists.newArrayList(); cuboidWriters.add(new HBaseCuboidWriter(cubeSegment, hTable)); @@ -88,10 +89,10 @@ public class HBaseStreamingOutput implements IStreamingOutput { } } -private HTableInterface createHTable(final CubeSegment cubeSegment) throws IOException { +private Table createHTable(final CubeSegment cubeSegment) throws IOException { final String hTableName = cubeSegment.getStorageLocationIdentifier(); CubeHTableUtil.createHTable(cubeSegment, null); -final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName); +final Table hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(TableName.valueOf(hTableName)); logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!"); return hTable; } http://git-wip-us.apache.org/repos/asf/kylin/blob/0d3af7e0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java -- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java index 5b2441c..2f7e164 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java @@ -24,11 +24,11 @@ import java.util.Collections; import java.util.List; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; @@ -69,19 +69,20 @@ public class MergeGCStep extends AbstractExecutable { List oldTables = getOldHTables(); if (oldTables != null && oldTables.size() > 0) { String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); -Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); -HBaseAdmin admin = null; +Admin admin = null; try { -admin = new HBaseAdmin(conf); +Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); +admin = conn.getAdmin(); + for (String table : oldTables) { -if (admin.tableExists(table)) { -HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table)); +if (admin.tableExists(TableName.valueOf(table))) { +HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf((table))); String host = tableDescriptor.getValue(IRealizationConstants.HTableTag); if (metadataUrlPrefix.equalsIgnoreCase(host)) { -
[2/2] kylin git commit: KYLIN-1528 Create a branch for v1.5 with HBase 1.x API
KYLIN-1528 Create a branch for v1.5 with HBase 1.x API Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/0d3af7e0 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/0d3af7e0 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/0d3af7e0 Branch: refs/heads/yang21-hbase1.x Commit: 0d3af7e005ee0ee6bdb6917715997b2e8f585623 Parents: 3b07c26 Author: shaofengshiAuthored: Wed Mar 23 17:07:05 2016 +0800 Committer: Hongbin Ma Committed: Wed Nov 9 08:15:48 2016 +0800 -- examples/test_case_data/sandbox/hbase-site.xml | 19 +--- .../kylin/provision/BuildCubeWithEngine.java| 13 ++- pom.xml | 12 +-- .../kylin/rest/security/AclHBaseStorage.java| 4 +- .../rest/security/MockAclHBaseStorage.java | 8 +- .../apache/kylin/rest/security/MockHTable.java | 95 .../rest/security/RealAclHBaseStorage.java | 9 +- .../apache/kylin/rest/service/AclService.java | 25 +++--- .../apache/kylin/rest/service/CubeService.java | 36 +++- .../apache/kylin/rest/service/QueryService.java | 24 +++-- .../apache/kylin/rest/service/UserService.java | 17 ++-- .../kylin/storage/hbase/HBaseConnection.java| 44 - .../kylin/storage/hbase/HBaseResourceStore.java | 31 +++ .../kylin/storage/hbase/HBaseStorage.java | 3 +- .../storage/hbase/cube/SimpleHBaseStore.java| 20 ++--- .../hbase/cube/v1/CubeSegmentTupleIterator.java | 11 +-- .../storage/hbase/cube/v1/CubeStorageQuery.java | 6 +- .../hbase/cube/v1/RegionScannerAdapter.java | 10 ++- .../cube/v1/SerializedHBaseTupleIterator.java | 4 +- .../observer/AggregateRegionObserver.java | 4 +- .../observer/AggregationScanner.java| 14 ++- .../observer/ObserverAggregationCache.java | 10 ++- .../coprocessor/observer/ObserverEnabler.java | 4 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 13 +-- .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 9 +- .../coprocessor/endpoint/CubeVisitService.java | 4 +- .../storage/hbase/steps/CubeHTableUtil.java | 16 ++-- .../storage/hbase/steps/DeprecatedGCStep.java | 23 ++--- .../storage/hbase/steps/HBaseCuboidWriter.java | 7 +- .../hbase/steps/HBaseStreamingOutput.java | 9 +- .../kylin/storage/hbase/steps/MergeGCStep.java | 23 ++--- .../storage/hbase/util/CleanHtableCLI.java | 12 +-- .../storage/hbase/util/CubeMigrationCLI.java| 36 .../hbase/util/CubeMigrationCheckCLI.java | 17 ++-- .../hbase/util/DeployCoprocessorCLI.java| 22 ++--- .../hbase/util/ExtendCubeToHybridCLI.java | 8 +- .../hbase/util/GridTableHBaseBenchmark.java | 34 +++ .../kylin/storage/hbase/util/HBaseClean.java| 18 ++-- .../hbase/util/HBaseRegionSizeCalculator.java | 35 .../kylin/storage/hbase/util/HBaseUsage.java| 9 +- .../storage/hbase/util/HbaseStreamingInput.java | 30 +++ .../hbase/util/HtableAlterMetadataCLI.java | 9 +- .../storage/hbase/util/OrphanHBaseCleanJob.java | 19 ++-- .../kylin/storage/hbase/util/PingHBaseCLI.java | 15 ++-- .../kylin/storage/hbase/util/RowCounterCLI.java | 11 +-- .../storage/hbase/util/StorageCleanupJob.java | 20 +++-- .../storage/hbase/util/UpdateHTableHostCLI.java | 17 ++-- .../observer/AggregateRegionObserverTest.java | 26 ++ .../v1/filter/TestFuzzyRowFilterV2EndToEnd.java | 5 +- .../org/apache/kylin/tool/CubeMigrationCLI.java | 36 .../kylin/tool/CubeMigrationCheckCLI.java | 16 ++-- .../kylin/tool/ExtendCubeToHybridCLI.java | 8 +- .../apache/kylin/tool/StorageCleanupJob.java| 20 +++-- 53 files changed, 450 insertions(+), 500 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/0d3af7e0/examples/test_case_data/sandbox/hbase-site.xml -- diff --git a/examples/test_case_data/sandbox/hbase-site.xml b/examples/test_case_data/sandbox/hbase-site.xml index 46d5345..734908e 100644 --- a/examples/test_case_data/sandbox/hbase-site.xml +++ b/examples/test_case_data/sandbox/hbase-site.xml @@ -190,22 +190,5 @@ zookeeper.znode.parent /hbase-unsecure - -hbase.client.pause -100 -General client pause value. Used mostly as value to wait -before running a retry of a failed get, region lookup, etc. -See hbase.client.retries.number for description of how we backoff from -this initial pause amount and how this pause works w/ retries. - - -hbase.client.retries.number -5 -Maximum retries. Used as maximum for all retryable -operations such as the getting of a cell's value, starting a row update, -
[1/2] kylin git commit: KYLIN-1528 Create a branch for v1.5 with HBase 1.x API [Forced Update!]
Repository: kylin Updated Branches: refs/heads/yang21-hbase1.x 337caca97 -> 0d3af7e00 (forced update) http://git-wip-us.apache.org/repos/asf/kylin/blob/0d3af7e0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java -- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java index 0e95102..c59fb33 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java @@ -27,7 +27,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.cube.CubeSegment; @@ -56,7 +57,7 @@ public class HBaseStreamingOutput implements IStreamingOutput { try { CubeSegment cubeSegment = (CubeSegment) buildable; -final HTableInterface hTable; +final Table hTable; hTable = createHTable(cubeSegment); List cuboidWriters = Lists.newArrayList(); cuboidWriters.add(new HBaseCuboidWriter(cubeSegment, hTable)); @@ -88,10 +89,10 @@ public class HBaseStreamingOutput implements IStreamingOutput { } } -private HTableInterface createHTable(final CubeSegment cubeSegment) throws IOException { +private Table createHTable(final CubeSegment cubeSegment) throws IOException { final String hTableName = cubeSegment.getStorageLocationIdentifier(); CubeHTableUtil.createHTable(cubeSegment, null); -final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName); +final Table hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(TableName.valueOf(hTableName)); logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!"); return hTable; } http://git-wip-us.apache.org/repos/asf/kylin/blob/0d3af7e0/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java -- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java index 5b2441c..2f7e164 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java @@ -24,11 +24,11 @@ import java.util.Collections; import java.util.List; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; @@ -69,19 +69,20 @@ public class MergeGCStep extends AbstractExecutable { List oldTables = getOldHTables(); if (oldTables != null && oldTables.size() > 0) { String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); -Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); -HBaseAdmin admin = null; +Admin admin = null; try { -admin = new HBaseAdmin(conf); +Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); +admin = conn.getAdmin(); + for (String table : oldTables) { -if (admin.tableExists(table)) { -HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table)); +if (admin.tableExists(TableName.valueOf(table))) { +HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf((table))); String host = tableDescriptor.getValue(IRealizationConstants.HTableTag); if (metadataUrlPrefix.equalsIgnoreCase(host)) { -
[1/6] kylin git commit: KYLIN-1851 add TrieDictionaryForest and NumDictionaryForest and SelfDefineSortableKey
Repository: kylin Updated Branches: refs/heads/master ddec049a6 -> c8efa5483 http://git-wip-us.apache.org/repos/asf/kylin/blob/58584483/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctHiveColumnsMapper2.java -- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctHiveColumnsMapper2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctHiveColumnsMapper2.java new file mode 100644 index 000..dfc46b6 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctHiveColumnsMapper2.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps.fdc2; + +import com.google.common.collect.Lists; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import org.apache.hadoop.io.Text; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.measure.BufferedMeasureCodec; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.model.TblColRef; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; + +/** + */ +public class FactDistinctHiveColumnsMapper2 extends FactDistinctColumnsMapperBase2{ + +protected boolean collectStatistics = false; +protected CuboidScheduler cuboidScheduler = null; +protected int nRowKey; +private Integer[][] allCuboidsBitSet = null; +private HyperLogLogPlusCounter[] allCuboidsHLL = null; +private Long[] cuboidIds; +private HashFunction hf = null; +private int rowCount = 0; +private int samplingPercentage; +private ByteArray[] row_hashcodes = null; +private ByteBuffer keyBuffer; +private static final Text EMPTY_TEXT = new Text(); +public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE; +public static final byte MARK_FOR_HLL = (byte) 0xFF; + +private int partitionColumnIndex = -1; +private boolean needFetchPartitionCol = true; + +@Override +protected void setup(Context context) throws IOException { +super.setup(context); +keyBuffer = ByteBuffer.allocate(4096); +collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED)); +if (collectStatistics) { +samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); +cuboidScheduler = new CuboidScheduler(cubeDesc); +nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length; + +List cuboidIdList = Lists.newArrayList(); +List allCuboidsBitSetList = Lists.newArrayList(); +addCuboidBitSet(baseCuboidId, allCuboidsBitSetList, cuboidIdList); + +allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[cuboidIdList.size()][]); +cuboidIds = cuboidIdList.toArray(new Long[cuboidIdList.size()]); + +allCuboidsHLL = new HyperLogLogPlusCounter[cuboidIds.length]; +for (int i = 0; i < cuboidIds.length; i++) { +allCuboidsHLL[i] = new HyperLogLogPlusCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()); +} + +hf = Hashing.murmur3_32(); +row_hashcodes = new ByteArray[nRowKey]; +for (int i = 0; i < nRowKey; i++) { +row_hashcodes[i] = new ByteArray(); +} + +TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); +if (partitionColRef != null) { +partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef); +} + +// check whether need fetch the partition col values +if (partitionColumnIndex <
[3/6] kylin git commit: KYLIN-2006 Make job engine distributed and HA
KYLIN-2006 Make job engine distributed and HA Signed-off-by: Yang LiProject: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4f66783e Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4f66783e Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4f66783e Branch: refs/heads/master Commit: 4f66783e1e4a765080e26e19cc0fe53a78ca599a Parents: 5858448 Author: kangkaisen Authored: Mon Sep 5 20:15:23 2016 +0800 Committer: Yang Li Committed: Wed Nov 9 06:32:01 2016 +0800 -- .../apache/kylin/common/KylinConfigBase.java| 1 + .../kylin/job/execution/ExecutableManager.java | 23 ++ .../impl/threadpool/DistributedScheduler.java | 349 +++ .../kylin/job/lock/DistributedJobLock.java | 29 ++ .../org/apache/kylin/job/lock/DoWatchLock.java | 23 ++ .../kylin/job/BaseTestDistributedScheduler.java | 226 .../apache/kylin/job/ContextTestExecutable.java | 51 +++ .../job/ITDistributedSchedulerBaseTest.java | 90 + .../job/ITDistributedSchedulerTakeOverTest.java | 60 .../kylin/rest/controller/JobController.java| 62 +--- .../apache/kylin/rest/service/CubeService.java | 4 + .../apache/kylin/rest/service/JobService.java | 96 - .../hbase/util/ZookeeperDistributedJobLock.java | 230 13 files changed, 1182 insertions(+), 62 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java -- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index 6d3e807..ee9f57c 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -755,6 +755,7 @@ abstract public class KylinConfigBase implements Serializable { public Map getSchedulers() { Map r = convertKeyToInteger(getPropertiesByPrefix("kylin.scheduler.")); r.put(0, "org.apache.kylin.job.impl.threadpool.DefaultScheduler"); +r.put(2, "org.apache.kylin.job.impl.threadpool.DistributedScheduler"); return r; } http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index 0901443..92fc8c9 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -235,11 +235,30 @@ public class ExecutableManager { } } +public void resumeRunningJobForce(String jobId) { +AbstractExecutable job = getJob(jobId); +if (job == null) { +return; +} + +if (job instanceof DefaultChainedExecutable) { +List tasks = ((DefaultChainedExecutable) job).getTasks(); +for (AbstractExecutable task : tasks) { +if (task.getStatus() == ExecutableState.RUNNING) { +updateJobOutput(task.getId(), ExecutableState.READY, null, null); +break; +} +} +} +updateJobOutput(jobId, ExecutableState.READY, null, null); +} + public void resumeJob(String jobId) { AbstractExecutable job = getJob(jobId); if (job == null) { return; } + if (job instanceof DefaultChainedExecutable) { List tasks = ((DefaultChainedExecutable) job).getTasks(); for (AbstractExecutable task : tasks) { @@ -254,6 +273,10 @@ public class ExecutableManager { public void discardJob(String jobId) { AbstractExecutable job = getJob(jobId); +if (job == null) { +return; +} + if (job instanceof DefaultChainedExecutable) { List tasks = ((DefaultChainedExecutable) job).getTasks(); for (AbstractExecutable task : tasks) { http://git-wip-us.apache.org/repos/asf/kylin/blob/4f66783e/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java
[5/6] kylin git commit: KYLIN-2006 refactor test case, avoid conflict with default CI metadata
KYLIN-2006 refactor test case, avoid conflict with default CI metadata Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/b1e81d4e Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/b1e81d4e Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/b1e81d4e Branch: refs/heads/master Commit: b1e81d4e261a5e9367390e63f8148468fac9e001 Parents: 85c4ded Author: Yang LiAuthored: Tue Nov 8 22:30:23 2016 +0800 Committer: Yang Li Committed: Wed Nov 9 06:32:02 2016 +0800 -- .../common/util/AbstractKylinTestCase.java | 4 +- .../kylin/job/execution/AbstractExecutable.java | 4 + .../kylin/job/execution/ExecutableManager.java | 4 + .../impl/threadpool/DistributedScheduler.java | 6 +- .../kylin/job/BaseTestDistributedScheduler.java | 121 +-- .../apache/kylin/job/ContextTestExecutable.java | 9 +- .../job/ITDistributedSchedulerBaseTest.java | 22 ++-- .../job/ITDistributedSchedulerTakeOverTest.java | 10 +- .../hbase/util/ZookeeperDistributedJobLock.java | 42 --- 9 files changed, 115 insertions(+), 107 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java -- diff --git a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java index 14bf90b..2154c32 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java @@ -34,7 +34,9 @@ public abstract class AbstractKylinTestCase { "org.apache.kylin.storage.hybrid.HybridManager", // "org.apache.kylin.metadata.realization.RealizationRegistry", // "org.apache.kylin.metadata.project.ProjectManager", // -"org.apache.kylin.metadata.MetadataManager" // +"org.apache.kylin.metadata.MetadataManager", // +"org.apache.kylin.job.impl.threadpool.DistributedScheduler", // +"org.apache.kylin.job.manager.ExecutableManager", // }; public abstract void createTestMetadata() throws Exception; http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 2a4b2df..9292418 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -66,6 +66,10 @@ public abstract class AbstractExecutable implements Executable, Idempotent { this.config = config; } +protected KylinConfig getConfig() { +return config; +} + protected ExecutableManager getManager() { return ExecutableManager.getInstance(config); } http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index 92fc8c9..1db612f 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -65,6 +65,10 @@ public class ExecutableManager { return r; } +public static void clearCache() { +CACHE.clear(); +} + private ExecutableManager(KylinConfig config) { logger.info("Using metadata url: " + config); this.config = config; http://git-wip-us.apache.org/repos/asf/kylin/blob/b1e81d4e/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java index 17df119..3937a24 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java @@ -43,11 +43,11 @@ import
[4/6] kylin git commit: KYLIN-2006 minor revision
KYLIN-2006 minor revision Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/85c4ded4 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/85c4ded4 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/85c4ded4 Branch: refs/heads/master Commit: 85c4ded4cc6882d661d79cfbe8bec0f7d7015ba9 Parents: 4f66783 Author: Yang LiAuthored: Mon Nov 7 20:59:25 2016 +0800 Committer: Yang Li Committed: Wed Nov 9 06:32:02 2016 +0800 -- .../impl/threadpool/DistributedScheduler.java | 3 +-- .../kylin/job/lock/DistributedJobLock.java | 7 +- .../org/apache/kylin/job/lock/DoWatchLock.java | 23 .../hbase/util/ZookeeperDistributedJobLock.java | 1 - 4 files changed, 7 insertions(+), 27 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/85c4ded4/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java index 11709c7..17df119 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java @@ -46,7 +46,6 @@ import org.apache.kylin.job.execution.Executable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.Output; import org.apache.kylin.job.lock.DistributedJobLock; -import org.apache.kylin.job.lock.DoWatchLock; import org.apache.kylin.job.lock.JobLock; import org.apache.kylin.job.manager.ExecutableManager; import org.slf4j.Logger; @@ -209,7 +208,7 @@ public class DistributedScheduler implements Scheduler, Conn } //when the segment lock released but the segment related job still running, resume the job. -private class DoWatchImpl implements DoWatchLock { +private class DoWatchImpl implements org.apache.kylin.job.lock.DistributedJobLock.DoWatchLock { private String serverName; public DoWatchImpl(String serverName) { http://git-wip-us.apache.org/repos/asf/kylin/blob/85c4ded4/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java index 5ba8426..9335e56 100644 --- a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java +++ b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java @@ -21,9 +21,14 @@ package org.apache.kylin.job.lock; import java.util.concurrent.ExecutorService; public interface DistributedJobLock extends JobLock { -boolean lockWithName(String cubeName, String serverName); + +boolean lockWithName(String name, String serverName); void unlockWithName(String name); void watchLock(ExecutorService pool, DoWatchLock doWatch); + +public interface DoWatchLock { +void doWatch(String path, String data); +} } http://git-wip-us.apache.org/repos/asf/kylin/blob/85c4ded4/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java deleted file mode 100644 index 08c13f9..000 --- a/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.job.lock; - -public interface DoWatchLock { -void doWatch(String path, String data); -}
[6/6] kylin git commit: KYLIN-2169 fix test case
KYLIN-2169 fix test case Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/c8efa548 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/c8efa548 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/c8efa548 Branch: refs/heads/master Commit: c8efa548307e6acec92740d2049b885b8b98190f Parents: b1e81d4 Author: Yang LiAuthored: Wed Nov 9 00:40:47 2016 +0800 Committer: Yang Li Committed: Wed Nov 9 06:34:04 2016 +0800 -- .../org/apache/kylin/job/execution/AbstractExecutable.java | 5 +++-- .../apache/kylin/job/execution/DefaultChainedExecutable.java | 8 2 files changed, 11 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/c8efa548/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 9292418..80a92de 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -61,7 +61,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { setId(UUID.randomUUID().toString()); } -void initConfig(KylinConfig config) { +protected void initConfig(KylinConfig config) { Preconditions.checkState(this.config == null || this.config == config); this.config = config; } @@ -201,7 +201,8 @@ public abstract class AbstractExecutable implements Executable, Idempotent { @Override public final ExecutableState getStatus() { -return getManager().getOutput(this.getId()).getState(); +ExecutableManager manager = getManager(); +return manager.getOutput(this.getId()).getState(); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/c8efa548/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index 7b92608..fccab30 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@ -21,6 +21,7 @@ package org.apache.kylin.job.execution; import java.util.List; import java.util.Map; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.job.exception.ExecuteException; import com.google.common.collect.Lists; @@ -36,6 +37,13 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai super(); } +protected void initConfig(KylinConfig config) { +super.initConfig(config); +for (AbstractExecutable sub : subTasks) { +sub.initConfig(config); +} +} + @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { List executables = getTasks();
kylin git commit: KYLIN-2169 fix test case
Repository: kylin Updated Branches: refs/heads/KYLIN-2006 aea46d7ce -> 248b181ce KYLIN-2169 fix test case Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/248b181c Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/248b181c Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/248b181c Branch: refs/heads/KYLIN-2006 Commit: 248b181ce5a6f5f43d8a64689aa2233b4a29d869 Parents: aea46d7 Author: Yang LiAuthored: Wed Nov 9 00:40:47 2016 +0800 Committer: Yang Li Committed: Wed Nov 9 00:40:47 2016 +0800 -- .../org/apache/kylin/job/execution/AbstractExecutable.java | 5 +++-- .../apache/kylin/job/execution/DefaultChainedExecutable.java | 8 2 files changed, 11 insertions(+), 2 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/248b181c/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 9292418..80a92de 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -61,7 +61,7 @@ public abstract class AbstractExecutable implements Executable, Idempotent { setId(UUID.randomUUID().toString()); } -void initConfig(KylinConfig config) { +protected void initConfig(KylinConfig config) { Preconditions.checkState(this.config == null || this.config == config); this.config = config; } @@ -201,7 +201,8 @@ public abstract class AbstractExecutable implements Executable, Idempotent { @Override public final ExecutableState getStatus() { -return getManager().getOutput(this.getId()).getState(); +ExecutableManager manager = getManager(); +return manager.getOutput(this.getId()).getState(); } @Override http://git-wip-us.apache.org/repos/asf/kylin/blob/248b181c/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index 7b92608..fccab30 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@ -21,6 +21,7 @@ package org.apache.kylin.job.execution; import java.util.List; import java.util.Map; +import org.apache.kylin.common.KylinConfig; import org.apache.kylin.job.exception.ExecuteException; import com.google.common.collect.Lists; @@ -36,6 +37,13 @@ public class DefaultChainedExecutable extends AbstractExecutable implements Chai super(); } +protected void initConfig(KylinConfig config) { +super.initConfig(config); +for (AbstractExecutable sub : subTasks) { +sub.initConfig(config); +} +} + @Override protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { List executables = getTasks();
[07/18] kylin git commit: KYLIN-2167 FactDistinctColumnsReducer may get wrong max/min partition col value
KYLIN-2167 FactDistinctColumnsReducer may get wrong max/min partition col value Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e7a20a06 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e7a20a06 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e7a20a06 Branch: refs/heads/KYLIN-2006 Commit: e7a20a063a3f007aa2a0ed5f39c616880ba46118 Parents: 4d9a923 Author: shaofengshiAuthored: Tue Nov 8 14:23:11 2016 +0800 Committer: shaofengshi Committed: Tue Nov 8 21:29:22 2016 +0800 -- .../mr/steps/FactDistinctColumnsReducer.java| 8 .../mr/steps/UpdateCubeInfoAfterBuildStep.java | 40 +--- 2 files changed, 18 insertions(+), 30 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/e7a20a06/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java -- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 5b00381..b09e614 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -152,14 +152,6 @@ public class FactDistinctColumnsReducer extends KylinReducer 1) { -colValues.set(1, value); -} else { -colValues.add(value); -} } else { colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1))); if (colValues.size() == 100) { //spill every 1 million http://git-wip-us.apache.org/repos/asf/kylin/blob/e7a20a06/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java -- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java index d285799..4d71f5d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java @@ -19,6 +19,7 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; +import java.text.ParseException; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -82,23 +83,8 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { private void updateTimeRange(CubeSegment segment) throws IOException { final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef(); -final String factDistinctPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); -final ReadableTable readableTable = new DFSFileTable(factDistinctPath + "/" + partitionCol.getName(), -1); -final ReadableTable.TableReader tableReader = readableTable.getReader(); -String minValue = null, maxValue = null; -try { -while (tableReader.next()) { -if (minValue == null) { -minValue = tableReader.getRow()[0]; -} -maxValue = tableReader.getRow()[0]; -} -} finally { -IOUtils.closeQuietly(tableReader); -} - final DataType partitionColType = partitionCol.getType(); -FastDateFormat dateFormat; +final FastDateFormat dateFormat; if (partitionColType.isDate()) { dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN); } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) { @@ -113,14 +99,24 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { throw new IllegalStateException("Type " + partitionColType + " is not valid partition column type"); } +final String factDistinctPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); +final ReadableTable readableTable = new DFSFileTable(factDistinctPath + "/" + partitionCol.getName(), -1); +final ReadableTable.TableReader tableReader =
[06/18] kylin git commit: KYLIN-2170 fix cleanup() in mapper and reducer
KYLIN-2170 fix cleanup() in mapper and reducer Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/47de9611 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/47de9611 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/47de9611 Branch: refs/heads/KYLIN-2006 Commit: 47de9611be71e3adeebe5da5041b37db5de9fa28 Parents: d5d2b9c Author: Li YangAuthored: Tue Nov 8 18:45:59 2016 +0800 Committer: Li Yang Committed: Tue Nov 8 18:56:33 2016 +0800 -- .../mr/steps/FactDistinctColumnsReducer.java| 39 ++-- .../mr/steps/FactDistinctHiveColumnsMapper.java | 34 ++- .../engine/mr/steps/InMemCuboidMapper.java | 24 +--- .../steps/RowKeyDistributionCheckerMapper.java | 12 ++-- .../cardinality/ColumnCardinalityMapper.java| 22 --- .../cardinality/ColumnCardinalityReducer.java | 39 ++-- .../hbase/steps/RangeKeyDistributionMapper.java | 10 ++- .../steps/RangeKeyDistributionReducer.java | 64 +++- 8 files changed, 137 insertions(+), 107 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/47de9611/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java -- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index c8624bb..ecbc6c2 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -176,25 +176,28 @@ public class FactDistinctColumnsReducer extends KylinReducer 0) { -outputDistinctValues(col, colValues, context); -colValues.clear(); -} -} else { -//output the hll info; -long grandTotal = 0; -for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) { -grandTotal += hll.getCountEstimate(); +try { +if (isStatistics == false) { +if (colValues.size() > 0) { +outputDistinctValues(col, colValues, context); +colValues.clear(); +} +} else { +//output the hll info; +long grandTotal = 0; +for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) { +grandTotal += hll.getCountEstimate(); +} +double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal; + +int mapperNumber = baseCuboidRowCountInMappers.size(); + +writeMapperAndCuboidStatistics(context); // for human check + CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), // +cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio); } -double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal; - -int mapperNumber = baseCuboidRowCountInMappers.size(); - -writeMapperAndCuboidStatistics(context); // for human check -CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), // -cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio); +} catch (Throwable ex) { +logger.error("", ex); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/47de9611/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java -- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java index 86ef487..177c9f6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java @@ -197,22 +197,26 @@ public class FactDistinctHiveColumnsMapper extends FactDistinctColumnsMap @Override protected void cleanup(Context context) throws IOException, InterruptedException { -if (collectStatistics) { -ByteBuffer hllBuf =
[05/18] kylin git commit: KYLIN-2162 Improve the cube validation error message
KYLIN-2162 Improve the cube validation error message Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/d5d2b9c2 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/d5d2b9c2 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/d5d2b9c2 Branch: refs/heads/KYLIN-2006 Commit: d5d2b9c28d625da54f7cbb72459382f73f5d8f9e Parents: 9615b4e Author: shaofengshiAuthored: Tue Nov 8 17:11:34 2016 +0800 Committer: shaofengshi Committed: Tue Nov 8 17:11:50 2016 +0800 -- .../org/apache/kylin/cube/model/CubeDesc.java | 64 .../validation/rule/AggregationGroupRule.java | 52 +--- .../kylin/cube/AggregationGroupRuleTest.java| 6 +- .../org/apache/kylin/cube/CubeDescTest.java | 14 ++--- 4 files changed, 92 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/d5d2b9c2/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java -- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 34650f4..94b41a3 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -39,6 +40,7 @@ import java.util.Map.Entry; import javax.annotation.Nullable; +import com.google.common.collect.Iterables; import org.apache.commons.codec.binary.Base64; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.ArrayUtils; @@ -50,6 +52,7 @@ import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.common.util.Array; import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.measure.MeasureType; import org.apache.kylin.measure.extendedcolumn.ExtendedColumnMeasureType; import org.apache.kylin.metadata.MetadataConstants; @@ -597,8 +600,15 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { } if (!includeDims.containsAll(mandatoryDims) || !includeDims.containsAll(hierarchyDims) || !includeDims.containsAll(jointDims)) { -logger.error("Aggregation group " + index + " Include dims not containing all the used dims"); -throw new IllegalStateException("Aggregation group " + index + " Include dims not containing all the used dims"); +List notIncluded = Lists.newArrayList(); +final Iterable all = Iterables.unmodifiableIterable(Iterables.concat(mandatoryDims, hierarchyDims, jointDims)); +for (String dim : all) { +if (includeDims.contains(dim) == false) { +notIncluded.add(dim); +} +} +logger.error("Aggregation group " + index + " Include dimensions not containing all the used dimensions"); +throw new IllegalStateException("Aggregation group " + index + " 'includes' dimensions not include all the dimensions:" + notIncluded.toString()); } Set normalDims = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); @@ -617,33 +627,36 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { } if (CollectionUtils.containsAny(mandatoryDims, hierarchyDims)) { -logger.warn("Aggregation group " + index + " mandatory dims overlap with hierarchy dims"); +logger.warn("Aggregation group " + index + " mandatory dimensions overlap with hierarchy dimensions: " + CollectionUtils.intersection(mandatoryDims, hierarchyDims)); } if (CollectionUtils.containsAny(mandatoryDims, jointDims)) { -logger.warn("Aggregation group " + index + " mandatory dims overlap with joint dims"); +logger.warn("Aggregation group " + index + " mandatory dimensions overlap with joint dimensions: " + CollectionUtils.intersection(mandatoryDims, jointDims)); } if (CollectionUtils.containsAny(hierarchyDims, jointDims)) { -logger.error("Aggregation group " + index + " hierarchy dims overlap with joint dims"); -throw new IllegalStateException("Aggregation group " + index + " hierarchy dims overlap with joint dims"); +
[04/18] kylin git commit: KYLIN-2169 Refactor AbstractExecutable to respect KylinConfig
KYLIN-2169 Refactor AbstractExecutable to respect KylinConfig Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9615b4ea Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9615b4ea Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9615b4ea Branch: refs/heads/KYLIN-2006 Commit: 9615b4ea6f817606c93df73dcafdcb151f4e8632 Parents: 637581f Author: Li YangAuthored: Tue Nov 8 15:47:33 2016 +0800 Committer: Li Yang Committed: Tue Nov 8 17:02:38 2016 +0800 -- .../kylin/job/common/ShellExecutable.java | 2 +- .../kylin/job/execution/AbstractExecutable.java | 37 +- .../job/execution/DefaultChainedExecutable.java | 1 - .../kylin/job/execution/ExecutableManager.java | 372 ++ .../job/impl/threadpool/DefaultScheduler.java | 2 +- .../kylin/job/manager/ExecutableManager.java| 377 --- .../apache/kylin/job/ExecutableManagerTest.java | 2 +- .../job/impl/threadpool/BaseSchedulerTest.java | 2 +- .../org/apache/kylin/engine/mr/CubingJob.java | 2 +- .../engine/mr/common/MapReduceExecutable.java | 16 +- .../apache/kylin/engine/mr/steps/CuboidJob.java | 2 +- .../kylin/engine/mr/steps/InMemCuboidJob.java | 2 +- .../engine/mr/steps/SaveStatisticsStep.java | 2 +- .../mr/steps/UpdateCubeInfoAfterBuildStep.java | 2 +- .../mr/steps/UpdateCubeInfoAfterMergeStep.java | 2 +- .../kylin/provision/BuildCubeWithEngine.java| 2 +- .../kylin/provision/BuildCubeWithStream.java| 2 +- .../apache/kylin/rest/service/BasicService.java | 2 +- .../storage/hbase/util/StorageCleanupJob.java | 2 +- .../apache/kylin/tool/JobInstanceExtractor.java | 2 +- .../apache/kylin/tool/StorageCleanupJob.java| 2 +- 21 files changed, 419 insertions(+), 416 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java index 111c1ba..a68f242 100644 --- a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java @@ -52,7 +52,7 @@ public class ShellExecutable extends AbstractExecutable { logger.info("executing:" + getCmd()); final ShellExecutableLogger logger = new ShellExecutableLogger(); final Pair result = context.getConfig().getCliCommandExecutor().execute(getCmd(), logger); -executableManager.addJobInfo(getId(), logger.getInfo()); +getManager().addJobInfo(getId(), logger.getInfo()); return new ExecuteResult(result.getFirst() == 0 ? ExecuteResult.State.SUCCEED : ExecuteResult.State.FAILED, result.getSecond()); } catch (IOException e) { logger.error("job:" + getId() + " execute finished with exception", e); http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 90e4d3c..f7b8a7c 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -32,7 +32,6 @@ import org.apache.kylin.common.util.MailService; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.exception.PersistentException; import org.apache.kylin.job.impl.threadpool.DefaultContext; -import org.apache.kylin.job.manager.ExecutableManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,43 +52,51 @@ public abstract class AbstractExecutable implements Executable, Idempotent { protected static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class); protected int retry = 0; +private KylinConfig config; private String name; private String id; private Map params = Maps.newHashMap(); -protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); - public AbstractExecutable() { setId(UUID.randomUUID().toString()); } + +void initConfig(KylinConfig config) { +Preconditions.checkState(this.config == null || this.config == config); +this.config = config; +
[09/18] kylin git commit: KYLIN-2135 Enlarge FactDistinctColumns reducer number
KYLIN-2135 Enlarge FactDistinctColumns reducer number Signed-off-by: shaofengshiProject: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/74214030 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/74214030 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/74214030 Branch: refs/heads/KYLIN-2006 Commit: 74214030272ffef275ccf0359b583b3278aec468 Parents: 47de961 Author: kangkaisen Authored: Wed Oct 26 19:35:20 2016 +0800 Committer: shaofengshi Committed: Tue Nov 8 21:29:22 2016 +0800 -- .../apache/kylin/common/KylinConfigBase.java| 5 ++ .../java/org/apache/kylin/cube/CubeManager.java | 35 .../kylin/engine/mr/DFSFileTableReader.java | 59 .../kylin/engine/mr/common/BatchConstants.java | 5 ++ .../mr/steps/FactDistinctColumnPartitioner.java | 11 +--- .../engine/mr/steps/FactDistinctColumnsJob.java | 18 +- .../mr/steps/FactDistinctColumnsMapperBase.java | 17 +- .../mr/steps/FactDistinctColumnsReducer.java| 34 ++- .../mr/steps/FactDistinctHiveColumnsMapper.java | 12 +++- 9 files changed, 170 insertions(+), 26 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java -- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index d9d10bb..6d3e807 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -374,6 +374,11 @@ abstract public class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.job.allow.empty.segment", "true")); } +//UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns +public int getUHCReducerCount() { +return Integer.parseInt(getOptional("kylin.job.uhc.reducer.count", "3")); +} + public String getOverrideHiveTableLocation(String table) { return getOptional("hive.table.location." + table.toUpperCase()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java -- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 87bb93d..9893040 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -31,6 +31,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -43,6 +44,7 @@ import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.DictionaryDesc; import org.apache.kylin.cube.model.DimensionDesc; import org.apache.kylin.dict.DictionaryInfo; import org.apache.kylin.dict.DictionaryManager; @@ -1049,4 +1051,37 @@ public class CubeManager implements IRealizationProvider { } return holes; } + +private final String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder"; + +//UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns +public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException { +List factDictCols = getAllDictColumnsOnFact(cubeDesc); +int[] uhcIndex = new int[factDictCols.size()]; + +//add GlobalDictionaryColumns +List dictionaryDescList = cubeDesc.getDictionaries(); +if (dictionaryDescList != null) { +for (DictionaryDesc dictionaryDesc : dictionaryDescList) { +if (dictionaryDesc.getBuilderClass() != null && dictionaryDesc.getBuilderClass().equalsIgnoreCase(GLOBAL_DICTIONNARY_CLASS)) { +for (int i = 0; i < factDictCols.size(); i++) { +if (factDictCols.get(i).equals(dictionaryDesc.getColumnRef())) { +uhcIndex[i] = 1; +break; +} +} +} +} +} + +//add ShardByColumns +Set shardByColumns = cubeDesc.getShardByColumns(); +for (int i = 0; i <
[12/18] kylin git commit: KYLIN-2169 fix bug..
KYLIN-2169 fix bug.. Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/82c2e5db Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/82c2e5db Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/82c2e5db Branch: refs/heads/KYLIN-2006 Commit: 82c2e5dba0d4783714c2e55c7407f3170f2e94c0 Parents: 0c6aa76 Author: Yang LiAuthored: Tue Nov 8 23:16:06 2016 +0800 Committer: Yang Li Committed: Tue Nov 8 23:16:06 2016 +0800 -- engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/82c2e5db/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java -- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java index bce0433..c3ee36c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java @@ -123,7 +123,7 @@ public class CubingJob extends DefaultChainedExecutable { @Override protected Pair formatNotifications(ExecutableContext context, ExecutableState state) { CubeInstance cubeInstance = CubeManager.getInstance(context.getConfig()).getCube(CubingExecutableUtil.getCubeName(this.getParams())); -final Output output = jobService.getOutput(getId()); +final Output output = getManager().getOutput(getId()); String logMsg; state = output.getState(); if (state != ExecutableState.ERROR && !cubeInstance.getDescriptor().getStatusNeedNotify().contains(state.toString())) {
[03/18] kylin git commit: Add UDF version() to get kylin version, Add UT
Add UDF version() to get kylin version, Add UT Signed-off-by: Hongbin MaUpdate kylin properties's UDF setting Signed-off-by: Hongbin Ma Add IT cases of version() and query output check. Signed-off-by: Hongbin Ma Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/637581fb Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/637581fb Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/637581fb Branch: refs/heads/KYLIN-2006 Commit: 637581fb7f9b1ae1a3abde93795755aaceea4ecf Parents: 408f9d4 Author: Yifan Zhang Authored: Tue Oct 18 16:12:07 2016 +0800 Committer: Hongbin Ma Committed: Tue Nov 8 14:50:15 2016 +0800 -- build/conf/kylin.properties | 2 ++ .../test_case_data/sandbox/kylin.properties | 1 + .../apache/kylin/query/ITKylinQueryTest.java| 28 +++- .../org/apache/kylin/query/KylinTestBase.java | 25 + .../query/sql_verifyContent/query01.sql | 21 +++ .../sql_verifyContent/query01.sql.expected.xml | 4 +++ .../org/apache/kylin/query/udf/VersionUDF.java | 27 +++ .../apache/kylin/query/udf/VersionUDFTest.java | 15 +++ 8 files changed, 122 insertions(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/637581fb/build/conf/kylin.properties -- diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties index 91aa5b8..e935ebf 100644 --- a/build/conf/kylin.properties +++ b/build/conf/kylin.properties @@ -135,6 +135,8 @@ kylin.table.snapshot.max_mb=300 kylin.query.scan.threshold=1000 +kylin.query.udf.version=org.apache.kylin.query.udf.VersionUDF + # 3G kylin.query.mem.budget=3221225472 http://git-wip-us.apache.org/repos/asf/kylin/blob/637581fb/examples/test_case_data/sandbox/kylin.properties -- diff --git a/examples/test_case_data/sandbox/kylin.properties b/examples/test_case_data/sandbox/kylin.properties index 43b0855..0efd5c9 100644 --- a/examples/test_case_data/sandbox/kylin.properties +++ b/examples/test_case_data/sandbox/kylin.properties @@ -104,6 +104,7 @@ kylin.hbase.region.count.max=5 kylin.hbase.hfile.size.gb=2 kylin.query.udf.massin=org.apache.kylin.query.udf.MassInUDF +kylin.query.udf.version=org.apache.kylin.query.udf.VersionUDF kylin.job.controller.lock=org.apache.kylin.job.lock.MockJobLock http://git-wip-us.apache.org/repos/asf/kylin/blob/637581fb/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java -- diff --git a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java index 520c5e6..9c1b640 100644 --- a/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java +++ b/kylin-it/src/test/java/org/apache/kylin/query/ITKylinQueryTest.java @@ -27,6 +27,7 @@ import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.KylinVersion; import org.apache.kylin.common.debug.BackdoorToggles; import org.apache.kylin.gridtable.GTScanSelfTerminatedException; import org.apache.kylin.gridtable.StorageSideBehavior; @@ -36,7 +37,9 @@ import org.apache.kylin.query.routing.rules.RemoveBlackoutRealizationsRule; import org.apache.kylin.storage.hbase.HBaseStorage; import org.dbunit.database.DatabaseConnection; import org.dbunit.database.IDatabaseConnection; +import org.dbunit.dataset.ITable; import org.junit.AfterClass; +import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Ignore; import org.junit.Rule; @@ -199,11 +202,16 @@ public class ITKylinQueryTest extends KylinTestBase { } @Test -public void testVerifyQuery() throws Exception { +public void testVerifyCountQuery() throws Exception { verifyResultRowCount(getQueryFolderPrefix() + "src/test/resources/query/sql_verifyCount"); } @Test +public void testVerifyContentQuery() throws Exception { +verifyResultContent(getQueryFolderPrefix() + "src/test/resources/query/sql_verifyContent"); +} + +@Test public void testOrderByQuery() throws Exception { execAndCompQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_orderby", null, true); // FIXME @@ -364,4 +372,22 @@ public class ITKylinQueryTest extends KylinTestBase { this.batchExecuteQuery(getQueryFolderPrefix() + "src/test/resources/query/sql_window"); } +@Test +public void
[01/18] kylin git commit: KYLIN-2166 Unclosed HBaseAdmin in StorageCleanupJob#cleanUnusedHBaseTables [Forced Update!]
Repository: kylin Updated Branches: refs/heads/KYLIN-2006 db9910568 -> aea46d7ce (forced update) KYLIN-2166 Unclosed HBaseAdmin in StorageCleanupJob#cleanUnusedHBaseTables Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/dc1866a8 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/dc1866a8 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/dc1866a8 Branch: refs/heads/KYLIN-2006 Commit: dc1866a8f21956acea6d489e37a731e87faf4502 Parents: 7211d92 Author: lidongsjtuAuthored: Mon Nov 7 23:59:15 2016 +0800 Committer: lidongsjtu Committed: Mon Nov 7 23:59:55 2016 +0800 -- .../apache/kylin/tool/StorageCleanupJob.java| 81 ++-- 1 file changed, 40 insertions(+), 41 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/dc1866a8/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java -- diff --git a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java index 3f82e94..4252e74 100644 --- a/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java +++ b/tool/src/main/java/org/apache/kylin/tool/StorageCleanupJob.java @@ -82,55 +82,54 @@ public class StorageCleanupJob extends AbstractApplication { private void cleanUnusedHBaseTables(Configuration conf) throws IOException { CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); // get all kylin hbase tables -HBaseAdmin hbaseAdmin = new HBaseAdmin(conf); -String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix; -HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*"); -List allTablesNeedToBeDropped = new ArrayList(); -for (HTableDescriptor desc : tableDescriptors) { -String host = desc.getValue(IRealizationConstants.HTableTag); -if (KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix().equalsIgnoreCase(host)) { -//only take care htables that belongs to self, and created more than 2 days - allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString()); +try (HBaseAdmin hbaseAdmin = new HBaseAdmin(conf)) { +String tableNamePrefix = IRealizationConstants.SharedHbaseStorageLocationPrefix; +HTableDescriptor[] tableDescriptors = hbaseAdmin.listTables(tableNamePrefix + ".*"); +List allTablesNeedToBeDropped = new ArrayList(); +for (HTableDescriptor desc : tableDescriptors) { +String host = desc.getValue(IRealizationConstants.HTableTag); +if (KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix().equalsIgnoreCase(host)) { +//only take care htables that belongs to self, and created more than 2 days + allTablesNeedToBeDropped.add(desc.getTableName().getNameAsString()); +} } -} -// remove every segment htable from drop list -for (CubeInstance cube : cubeMgr.listAllCubes()) { -for (CubeSegment seg : cube.getSegments()) { -String tablename = seg.getStorageLocationIdentifier(); -if (allTablesNeedToBeDropped.contains(tablename)) { -allTablesNeedToBeDropped.remove(tablename); -logger.info("Exclude table " + tablename + " from drop list, as the table belongs to cube " + cube.getName() + " with status " + cube.getStatus()); +// remove every segment htable from drop list +for (CubeInstance cube : cubeMgr.listAllCubes()) { +for (CubeSegment seg : cube.getSegments()) { +String tablename = seg.getStorageLocationIdentifier(); +if (allTablesNeedToBeDropped.contains(tablename)) { +allTablesNeedToBeDropped.remove(tablename); +logger.info("Exclude table " + tablename + " from drop list, as the table belongs to cube " + cube.getName() + " with status " + cube.getStatus()); +} } } -} -if (delete == true) { -// drop tables -ExecutorService executorService = Executors.newSingleThreadExecutor(); -for (String htableName : allTablesNeedToBeDropped) { -FutureTask futureTask = new FutureTask(new DeleteHTableRunnable(hbaseAdmin, htableName)); -executorService.execute(futureTask); -try { -futureTask.get(deleteTimeout, TimeUnit.MINUTES); -} catch (TimeoutException e) { -
[14/18] kylin git commit: KYLIN-1851:unfinished add TrieDictionaryForest and NumDictionaryForest and SelfDefineSortableKey
http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctHiveColumnsMapper2.java -- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctHiveColumnsMapper2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctHiveColumnsMapper2.java new file mode 100644 index 000..dfc46b6 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/fdc2/FactDistinctHiveColumnsMapper2.java @@ -0,0 +1,232 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.engine.mr.steps.fdc2; + +import com.google.common.collect.Lists; +import com.google.common.hash.HashFunction; +import com.google.common.hash.Hasher; +import com.google.common.hash.Hashing; +import org.apache.hadoop.io.Text; +import org.apache.kylin.common.util.ByteArray; +import org.apache.kylin.common.util.Bytes; +import org.apache.kylin.cube.cuboid.CuboidScheduler; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.apache.kylin.measure.BufferedMeasureCodec; +import org.apache.kylin.measure.hllc.HyperLogLogPlusCounter; +import org.apache.kylin.metadata.datatype.DataType; +import org.apache.kylin.metadata.model.TblColRef; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.Collection; +import java.util.List; + +/** + */ +public class FactDistinctHiveColumnsMapper2 extends FactDistinctColumnsMapperBase2{ + +protected boolean collectStatistics = false; +protected CuboidScheduler cuboidScheduler = null; +protected int nRowKey; +private Integer[][] allCuboidsBitSet = null; +private HyperLogLogPlusCounter[] allCuboidsHLL = null; +private Long[] cuboidIds; +private HashFunction hf = null; +private int rowCount = 0; +private int samplingPercentage; +private ByteArray[] row_hashcodes = null; +private ByteBuffer keyBuffer; +private static final Text EMPTY_TEXT = new Text(); +public static final byte MARK_FOR_PARTITION_COL = (byte) 0xFE; +public static final byte MARK_FOR_HLL = (byte) 0xFF; + +private int partitionColumnIndex = -1; +private boolean needFetchPartitionCol = true; + +@Override +protected void setup(Context context) throws IOException { +super.setup(context); +keyBuffer = ByteBuffer.allocate(4096); +collectStatistics = Boolean.parseBoolean(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_ENABLED)); +if (collectStatistics) { +samplingPercentage = Integer.parseInt(context.getConfiguration().get(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT)); +cuboidScheduler = new CuboidScheduler(cubeDesc); +nRowKey = cubeDesc.getRowkey().getRowKeyColumns().length; + +List cuboidIdList = Lists.newArrayList(); +List allCuboidsBitSetList = Lists.newArrayList(); +addCuboidBitSet(baseCuboidId, allCuboidsBitSetList, cuboidIdList); + +allCuboidsBitSet = allCuboidsBitSetList.toArray(new Integer[cuboidIdList.size()][]); +cuboidIds = cuboidIdList.toArray(new Long[cuboidIdList.size()]); + +allCuboidsHLL = new HyperLogLogPlusCounter[cuboidIds.length]; +for (int i = 0; i < cuboidIds.length; i++) { +allCuboidsHLL[i] = new HyperLogLogPlusCounter(cubeDesc.getConfig().getCubeStatsHLLPrecision()); +} + +hf = Hashing.murmur3_32(); +row_hashcodes = new ByteArray[nRowKey]; +for (int i = 0; i < nRowKey; i++) { +row_hashcodes[i] = new ByteArray(); +} + +TblColRef partitionColRef = cubeDesc.getModel().getPartitionDesc().getPartitionDateColumnRef(); +if (partitionColRef != null) { +partitionColumnIndex = intermediateTableDesc.getColumnIndex(partitionColRef); +} + +// check whether need fetch the partition col values +if (partitionColumnIndex < 0) { +// if partition col not on cube, no need +
[02/18] kylin git commit: KYLIN-2165 skip redistribute if row count is 0
KYLIN-2165 skip redistribute if row count is 0 Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/408f9d43 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/408f9d43 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/408f9d43 Branch: refs/heads/KYLIN-2006 Commit: 408f9d43d309eedf19361f0a701213214e436cdd Parents: dc1866a Author: shaofengshiAuthored: Tue Nov 8 14:26:53 2016 +0800 Committer: shaofengshi Committed: Tue Nov 8 14:26:53 2016 +0800 -- .../java/org/apache/kylin/source/hive/HiveMRInput.java| 10 +++--- 1 file changed, 7 insertions(+), 3 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/408f9d43/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java -- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 67ceffc..9e9dc25 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -282,9 +282,13 @@ public class HiveMRInput implements IMRInput { try { long rowCount = computeRowCount(database, tableName); logger.debug("Row count of table '" + intermediateTable + "' is " + rowCount); -if (!config.isEmptySegmentAllowed() && rowCount == 0) { -stepLogger.log("Detect upstream hive table is empty, " + "fail the job because \"kylin.job.allow.empty.segment\" = \"false\""); -return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); +if (rowCount == 0) { +if (!config.isEmptySegmentAllowed()) { +stepLogger.log("Detect upstream hive table is empty, " + "fail the job because \"kylin.job.allow.empty.segment\" = \"false\""); +return new ExecuteResult(ExecuteResult.State.ERROR, stepLogger.getBufferedLog()); +} else { +return new ExecuteResult(ExecuteResult.State.SUCCEED, "Row count is 0, no need to redistribute"); +} } int mapperInputRows = config.getHadoopJobMapperInputRows();
[18/18] kylin git commit: KYLIN-2006 fix test case
KYLIN-2006 fix test case Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/aea46d7c Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/aea46d7c Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/aea46d7c Branch: refs/heads/KYLIN-2006 Commit: aea46d7ce2d3b6de1a3b5d79bf9715b3fbfaa680 Parents: 3aad93a Author: Yang LiAuthored: Tue Nov 8 22:30:23 2016 +0800 Committer: Yang Li Committed: Wed Nov 9 00:14:29 2016 +0800 -- .../common/util/AbstractKylinTestCase.java | 4 +- .../kylin/job/execution/AbstractExecutable.java | 4 + .../kylin/job/execution/ExecutableManager.java | 4 + .../impl/threadpool/DistributedScheduler.java | 6 +- .../kylin/job/BaseTestDistributedScheduler.java | 121 +-- .../apache/kylin/job/ContextTestExecutable.java | 9 +- .../job/ITDistributedSchedulerBaseTest.java | 22 ++-- .../job/ITDistributedSchedulerTakeOverTest.java | 10 +- .../hbase/util/ZookeeperDistributedJobLock.java | 42 --- 9 files changed, 115 insertions(+), 107 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/aea46d7c/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java -- diff --git a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java index 14bf90b..2154c32 100644 --- a/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java +++ b/core-common/src/test/java/org/apache/kylin/common/util/AbstractKylinTestCase.java @@ -34,7 +34,9 @@ public abstract class AbstractKylinTestCase { "org.apache.kylin.storage.hybrid.HybridManager", // "org.apache.kylin.metadata.realization.RealizationRegistry", // "org.apache.kylin.metadata.project.ProjectManager", // -"org.apache.kylin.metadata.MetadataManager" // +"org.apache.kylin.metadata.MetadataManager", // +"org.apache.kylin.job.impl.threadpool.DistributedScheduler", // +"org.apache.kylin.job.manager.ExecutableManager", // }; public abstract void createTestMetadata() throws Exception; http://git-wip-us.apache.org/repos/asf/kylin/blob/aea46d7c/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 2a4b2df..9292418 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -66,6 +66,10 @@ public abstract class AbstractExecutable implements Executable, Idempotent { this.config = config; } +protected KylinConfig getConfig() { +return config; +} + protected ExecutableManager getManager() { return ExecutableManager.getInstance(config); } http://git-wip-us.apache.org/repos/asf/kylin/blob/aea46d7c/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java index 92fc8c9..1db612f 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/ExecutableManager.java @@ -65,6 +65,10 @@ public class ExecutableManager { return r; } +public static void clearCache() { +CACHE.clear(); +} + private ExecutableManager(KylinConfig config) { logger.info("Using metadata url: " + config); this.config = config; http://git-wip-us.apache.org/repos/asf/kylin/blob/aea46d7c/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java index 17df119..3937a24 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java @@ -43,11 +43,11 @@ import org.apache.kylin.job.exception.SchedulerException; import
[08/18] kylin git commit: KYLIN-2135 minor format update
KYLIN-2135 minor format update Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/dd496a69 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/dd496a69 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/dd496a69 Branch: refs/heads/KYLIN-2006 Commit: dd496a6945ba20192c2fa2bda845c055357ab44a Parents: 7421403 Author: shaofengshiAuthored: Thu Nov 3 18:49:50 2016 +0800 Committer: shaofengshi Committed: Tue Nov 8 21:29:22 2016 +0800 -- .../kylin/engine/mr/DFSFileTableReader.java | 92 ++-- .../engine/mr/steps/FactDistinctColumnsJob.java | 34 2 files changed, 61 insertions(+), 65 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/dd496a69/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java -- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java index dda1d6f..173c908 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java @@ -23,14 +23,15 @@ import java.io.Closeable; import java.io.EOFException; import java.io.IOException; import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringEscapeUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; @@ -57,7 +58,7 @@ public class DFSFileTableReader implements TableReader { private String filePath; private String delim; -private List readerList; +private List readerList; private String curLine; private String[] curColumns; @@ -72,33 +73,33 @@ public class DFSFileTableReader implements TableReader { this.filePath = filePath; this.delim = delim; this.expectedColumnNumber = expectedColumnNumber; -this.readerList = new ArrayList(); +this.readerList = new ArrayList(); FileSystem fs = HadoopUtil.getFileSystem(filePath); -ArrayList allFiles = new ArrayList<>(); -FileStatus status = fs.getFileStatus(new Path(filePath)); -if (status.isFile()) { -allFiles.add(status); -} else { -FileStatus[] listStatus = fs.listStatus(new Path(filePath)); -allFiles.addAll(Arrays.asList(listStatus)); -} - -try { -for (FileStatus f : allFiles) { -RowReader rowReader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, f.getPath().toString()); -this.readerList.add(rowReader); -} +ArrayList allFiles = new ArrayList<>(); +FileStatus status = fs.getFileStatus(new Path(filePath)); +if (status.isFile()) { +allFiles.add(status); +} else { +FileStatus[] listStatus = fs.listStatus(new Path(filePath)); +allFiles.addAll(Arrays.asList(listStatus)); +} + +try { +for (FileStatus f : allFiles) { +RowReader rowReader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, f.getPath().toString()); +this.readerList.add(rowReader); +} } catch (IOException e) { if (isExceptionSayingNotSeqFile(e) == false) throw e; -this.readerList = new ArrayList(); -for (FileStatus f : allFiles) { -RowReader rowReader = new CsvRowReader(fs, f.getPath().toString()); -this.readerList.add(rowReader); -} +this.readerList = new ArrayList(); +for (FileStatus f : allFiles) { +RowReader rowReader = new CsvRowReader(fs, f.getPath().toString()); +this.readerList.add(rowReader); +} } } @@ -114,20 +115,20 @@ public class DFSFileTableReader implements TableReader { @Override public boolean next() throws IOException { -int curReaderIndex = -1; -RowReader curReader; - -while (++curReaderIndex < readerList.size()) { -curReader = readerList.get(curReaderIndex); -curLine = curReader.nextLine(); -curColumns = null; - -if (curLine
[15/18] kylin git commit: KYLIN-1851:unfinished add TrieDictionaryForest and NumDictionaryForest and SelfDefineSortableKey
KYLIN-1851:unfinished add TrieDictionaryForest and NumDictionaryForest and SelfDefineSortableKey Signed-off-by: Yang LiProject: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f0804f95 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f0804f95 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f0804f95 Branch: refs/heads/KYLIN-2006 Commit: f0804f95ae59ef7adcc0e6e4fe9a3b3620586b96 Parents: ddec049 Author: xiefan46 <958034...@qq.com> Authored: Mon Nov 7 14:37:22 2016 +0800 Committer: Yang Li Committed: Tue Nov 8 23:23:34 2016 +0800 -- .../org/apache/kylin/dict/ByteComparator.java | 44 ++ .../kylin/dict/NumberDictionaryForest.java | 278 .../dict/NumberDictionaryForestBuilder.java | 58 ++ .../apache/kylin/dict/TrieDictionaryForest.java | 406 .../kylin/dict/TrieDictionaryForestBuilder.java | 125 .../kylin/dict/TrieDictionaryForestTest.java| 657 +++ .../fdc2/FactDistinctColumnPartitioner2.java| 47 ++ .../fdc2/FactDistinctColumnsCombiner2.java | 44 ++ .../mr/steps/fdc2/FactDistinctColumnsJob2.java | 149 + .../fdc2/FactDistinctColumnsMapperBase2.java| 102 +++ .../fdc2/FactDistinctHiveColumnsMapper2.java| 232 +++ .../mr/steps/fdc2/SelfDefineSortableKey.java| 130 .../kylin/engine/mr/steps/fdc2/TypeFlag.java| 28 + .../mr/steps/NumberDictionaryForestTest.java| 214 ++ .../mr/steps/SelfDefineSortableKeyTest.java | 228 +++ 15 files changed, 2742 insertions(+) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/core-dictionary/src/main/java/org/apache/kylin/dict/ByteComparator.java -- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/ByteComparator.java b/core-dictionary/src/main/java/org/apache/kylin/dict/ByteComparator.java new file mode 100644 index 000..74d5ec5 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/ByteComparator.java @@ -0,0 +1,44 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.dict; + +import org.apache.kylin.common.util.ByteArray; + +import java.util.Comparator; + +/** + * Created by xiefan on 16-10-28. + */ +public class ByteComparator implements Comparator { +private BytesConverter converter; + +public ByteComparator(BytesConverter converter) { +this.converter = converter; +} + +@Override +public int compare(T o1, T o2) { +//return BytesUtil.safeCompareBytes(converter.convertToBytes(o1),converter.convertToBytes(o2)); +byte[] b1 = converter.convertToBytes(o1); +byte[] b2 = converter.convertToBytes(o2); +ByteArray ba1 = new ByteArray(b1, 0, b1.length); +ByteArray ba2 = new ByteArray(b2, 0, b2.length); +return ba1.compareTo(ba2); +} +} http://git-wip-us.apache.org/repos/asf/kylin/blob/f0804f95/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForest.java -- diff --git a/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForest.java b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForest.java new file mode 100644 index 000..8caa4b6 --- /dev/null +++ b/core-dictionary/src/main/java/org/apache/kylin/dict/NumberDictionaryForest.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software +
[17/18] kylin git commit: KYLIN-2006 minor revision
KYLIN-2006 minor revision Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3aad93a5 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3aad93a5 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3aad93a5 Branch: refs/heads/KYLIN-2006 Commit: 3aad93a5d1c5dac30b25d45d72ee175948220be9 Parents: 7fe4317 Author: Yang LiAuthored: Mon Nov 7 20:59:25 2016 +0800 Committer: Yang Li Committed: Tue Nov 8 23:23:35 2016 +0800 -- .../impl/threadpool/DistributedScheduler.java | 3 +-- .../kylin/job/lock/DistributedJobLock.java | 7 +- .../org/apache/kylin/job/lock/DoWatchLock.java | 23 .../hbase/util/ZookeeperDistributedJobLock.java | 1 - 4 files changed, 7 insertions(+), 27 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/3aad93a5/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java index 11709c7..17df119 100644 --- a/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java +++ b/core-job/src/main/java/org/apache/kylin/job/impl/threadpool/DistributedScheduler.java @@ -46,7 +46,6 @@ import org.apache.kylin.job.execution.Executable; import org.apache.kylin.job.execution.ExecutableState; import org.apache.kylin.job.execution.Output; import org.apache.kylin.job.lock.DistributedJobLock; -import org.apache.kylin.job.lock.DoWatchLock; import org.apache.kylin.job.lock.JobLock; import org.apache.kylin.job.manager.ExecutableManager; import org.slf4j.Logger; @@ -209,7 +208,7 @@ public class DistributedScheduler implements Scheduler, Conn } //when the segment lock released but the segment related job still running, resume the job. -private class DoWatchImpl implements DoWatchLock { +private class DoWatchImpl implements org.apache.kylin.job.lock.DistributedJobLock.DoWatchLock { private String serverName; public DoWatchImpl(String serverName) { http://git-wip-us.apache.org/repos/asf/kylin/blob/3aad93a5/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java index 5ba8426..9335e56 100644 --- a/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java +++ b/core-job/src/main/java/org/apache/kylin/job/lock/DistributedJobLock.java @@ -21,9 +21,14 @@ package org.apache.kylin.job.lock; import java.util.concurrent.ExecutorService; public interface DistributedJobLock extends JobLock { -boolean lockWithName(String cubeName, String serverName); + +boolean lockWithName(String name, String serverName); void unlockWithName(String name); void watchLock(ExecutorService pool, DoWatchLock doWatch); + +public interface DoWatchLock { +void doWatch(String path, String data); +} } http://git-wip-us.apache.org/repos/asf/kylin/blob/3aad93a5/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java b/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java deleted file mode 100644 index 08c13f9..000 --- a/core-job/src/main/java/org/apache/kylin/job/lock/DoWatchLock.java +++ /dev/null @@ -1,23 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. -*/ - -package org.apache.kylin.job.lock; - -public interface DoWatchLock { -void doWatch(String path, String data); -}
kylin git commit: minor, fix checkstyle and pom warning
Repository: kylin Updated Branches: refs/heads/master 82c2e5dba -> ddec049a6 minor, fix checkstyle and pom warning Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/ddec049a Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/ddec049a Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/ddec049a Branch: refs/heads/master Commit: ddec049a6b7cd9596a59aa8e2d4c500770bbddd3 Parents: 82c2e5d Author: Yang LiAuthored: Tue Nov 8 23:23:11 2016 +0800 Committer: Yang Li Committed: Tue Nov 8 23:23:11 2016 +0800 -- .../org/apache/kylin/job/execution/DefaultChainedExecutable.java | 1 - tool/pom.xml | 4 2 files changed, 5 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/ddec049a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java index 621d51d..7b92608 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/DefaultChainedExecutable.java @@ -21,7 +21,6 @@ package org.apache.kylin.job.execution; import java.util.List; import java.util.Map; -import org.apache.kylin.common.KylinConfig; import org.apache.kylin.job.exception.ExecuteException; import com.google.common.collect.Lists; http://git-wip-us.apache.org/repos/asf/kylin/blob/ddec049a/tool/pom.xml -- diff --git a/tool/pom.xml b/tool/pom.xml index 3d341d8..38901c6 100644 --- a/tool/pom.xml +++ b/tool/pom.xml @@ -61,10 +61,6 @@ test-jar test - -org.apache.kylin -kylin-source-hive -
kylin git commit: KYLIN-2169 fix bug..
Repository: kylin Updated Branches: refs/heads/master 0c6aa760e -> 82c2e5dba KYLIN-2169 fix bug.. Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/82c2e5db Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/82c2e5db Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/82c2e5db Branch: refs/heads/master Commit: 82c2e5dba0d4783714c2e55c7407f3170f2e94c0 Parents: 0c6aa76 Author: Yang LiAuthored: Tue Nov 8 23:16:06 2016 +0800 Committer: Yang Li Committed: Tue Nov 8 23:16:06 2016 +0800 -- engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/82c2e5db/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java -- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java index bce0433..c3ee36c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java @@ -123,7 +123,7 @@ public class CubingJob extends DefaultChainedExecutable { @Override protected Pair formatNotifications(ExecutableContext context, ExecutableState state) { CubeInstance cubeInstance = CubeManager.getInstance(context.getConfig()).getCube(CubingExecutableUtil.getCubeName(this.getParams())); -final Output output = jobService.getOutput(getId()); +final Output output = getManager().getOutput(getId()); String logMsg; state = output.getState(); if (state != ExecutableState.ERROR && !cubeInstance.getDescriptor().getStatusNeedNotify().contains(state.toString())) {
[3/3] kylin git commit: KYLIN-1672 support kylin on cdh 5.7
KYLIN-1672 support kylin on cdh 5.7 Signed-off-by: Li YangProject: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/f58ece86 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/f58ece86 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/f58ece86 Branch: refs/heads/yang21-cdh5.7 Commit: f58ece86243be18c8f3be68776951b20fedc00aa Parents: 337caca Author: Lynne Jiang Authored: Mon May 16 03:33:27 2016 -0700 Committer: Hongbin Ma Committed: Tue Nov 8 22:21:13 2016 +0800 -- build/conf/kylin.properties | 3 + dev-support/test_all_against_hdp_2_2_4_2_2.sh | 0 .../kylin/engine/mr/steps/MockupMapContext.java | 15 +- examples/test_case_data/sandbox/core-site.xml | 146 +++--- examples/test_case_data/sandbox/hbase-site.xml | 162 ++ examples/test_case_data/sandbox/hdfs-site.xml | 259 ++ examples/test_case_data/sandbox/mapred-site.xml | 398 ++- examples/test_case_data/sandbox/yarn-site.xml | 496 ++- pom.xml | 16 +- server/pom.xml | 36 ++ .../storage/hbase/steps/MockupMapContext.java | 19 +- tool/pom.xml| 12 + 12 files changed, 431 insertions(+), 1131 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/f58ece86/build/conf/kylin.properties -- diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties index ed86bdb..ebd8cde 100644 --- a/build/conf/kylin.properties +++ b/build/conf/kylin.properties @@ -118,6 +118,9 @@ kylin.job.mapreduce.mapper.input.rows=100 kylin.job.step.timeout=7200 +# for secure cdh, filtering hive dependency is risky, so filter nothing +kylin.job.dependency.filterlist=[^,]+ + ### CUBE ### # 'auto', 'inmem', 'layer' or 'random' for testing http://git-wip-us.apache.org/repos/asf/kylin/blob/f58ece86/dev-support/test_all_against_hdp_2_2_4_2_2.sh -- diff --git a/dev-support/test_all_against_hdp_2_2_4_2_2.sh b/dev-support/test_all_against_hdp_2_2_4_2_2.sh old mode 100644 new mode 100755 http://git-wip-us.apache.org/repos/asf/kylin/blob/f58ece86/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MockupMapContext.java -- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MockupMapContext.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MockupMapContext.java index 847071d..9900465 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MockupMapContext.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MockupMapContext.java @@ -77,6 +77,7 @@ public class MockupMapContext { outKV[0] = key; outKV[1] = value; } + } @Override @@ -99,6 +100,7 @@ public class MockupMapContext { throw new NotImplementedException(); } + @Override public float getProgress() { throw new NotImplementedException(); @@ -195,17 +197,17 @@ public class MockupMapContext { } @Override -public RawComparator getSortComparator() { +public boolean userClassesTakesPrecedence() { throw new NotImplementedException(); } @Override -public String getJar() { +public RawComparator getSortComparator() { throw new NotImplementedException(); } @Override -public RawComparator getGroupingComparator() { +public String getJar() { throw new NotImplementedException(); } @@ -221,7 +223,7 @@ public class MockupMapContext { @Override public boolean getProfileEnabled() { -throw new NotImplementedException(); +return false; } @Override @@ -308,6 +310,11 @@ public class MockupMapContext { public RawComparator getCombinerKeyGroupingComparator() { throw new NotImplementedException(); } + +@Override +public RawComparator getGroupingComparator() { +return null; +} }); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/f58ece86/examples/test_case_data/sandbox/core-site.xml -- diff --git a/examples/test_case_data/sandbox/core-site.xml
[1/3] kylin git commit: KYLIN-1528 Create a branch for v1.5 with HBase 1.x API [Forced Update!]
Repository: kylin Updated Branches: refs/heads/yang21-cdh5.7 5ee22ec53 -> f58ece862 (forced update) http://git-wip-us.apache.org/repos/asf/kylin/blob/337caca9/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java -- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java index 0e95102..c59fb33 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java @@ -27,7 +27,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.cube.CubeSegment; @@ -56,7 +57,7 @@ public class HBaseStreamingOutput implements IStreamingOutput { try { CubeSegment cubeSegment = (CubeSegment) buildable; -final HTableInterface hTable; +final Table hTable; hTable = createHTable(cubeSegment); List cuboidWriters = Lists.newArrayList(); cuboidWriters.add(new HBaseCuboidWriter(cubeSegment, hTable)); @@ -88,10 +89,10 @@ public class HBaseStreamingOutput implements IStreamingOutput { } } -private HTableInterface createHTable(final CubeSegment cubeSegment) throws IOException { +private Table createHTable(final CubeSegment cubeSegment) throws IOException { final String hTableName = cubeSegment.getStorageLocationIdentifier(); CubeHTableUtil.createHTable(cubeSegment, null); -final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName); +final Table hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(TableName.valueOf(hTableName)); logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!"); return hTable; } http://git-wip-us.apache.org/repos/asf/kylin/blob/337caca9/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java -- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java index 5b2441c..2f7e164 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java @@ -24,11 +24,11 @@ import java.util.Collections; import java.util.List; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; @@ -69,19 +69,20 @@ public class MergeGCStep extends AbstractExecutable { List oldTables = getOldHTables(); if (oldTables != null && oldTables.size() > 0) { String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); -Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); -HBaseAdmin admin = null; +Admin admin = null; try { -admin = new HBaseAdmin(conf); +Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); +admin = conn.getAdmin(); + for (String table : oldTables) { -if (admin.tableExists(table)) { -HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table)); +if (admin.tableExists(TableName.valueOf(table))) { +HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf((table))); String host = tableDescriptor.getValue(IRealizationConstants.HTableTag); if (metadataUrlPrefix.equalsIgnoreCase(host)) { -
[2/3] kylin git commit: KYLIN-1528 Create a branch for v1.5 with HBase 1.x API
KYLIN-1528 Create a branch for v1.5 with HBase 1.x API Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/337caca9 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/337caca9 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/337caca9 Branch: refs/heads/yang21-cdh5.7 Commit: 337caca973597e346cbc736b815d9880e5a6fff8 Parents: 3b07c26 Author: shaofengshiAuthored: Wed Mar 23 17:07:05 2016 +0800 Committer: Hongbin Ma Committed: Tue Nov 8 22:19:21 2016 +0800 -- examples/test_case_data/sandbox/hbase-site.xml | 19 +--- .../kylin/provision/BuildCubeWithEngine.java| 13 ++- pom.xml | 12 +-- .../kylin/rest/security/AclHBaseStorage.java| 4 +- .../rest/security/MockAclHBaseStorage.java | 8 +- .../apache/kylin/rest/security/MockHTable.java | 95 .../rest/security/RealAclHBaseStorage.java | 9 +- .../apache/kylin/rest/service/AclService.java | 25 +++--- .../apache/kylin/rest/service/CubeService.java | 36 +++- .../apache/kylin/rest/service/QueryService.java | 24 +++-- .../apache/kylin/rest/service/UserService.java | 17 ++-- .../kylin/storage/hbase/HBaseConnection.java| 44 - .../kylin/storage/hbase/HBaseResourceStore.java | 31 +++ .../kylin/storage/hbase/HBaseStorage.java | 3 +- .../storage/hbase/cube/SimpleHBaseStore.java| 20 ++--- .../hbase/cube/v1/CubeSegmentTupleIterator.java | 11 +-- .../storage/hbase/cube/v1/CubeStorageQuery.java | 6 +- .../hbase/cube/v1/RegionScannerAdapter.java | 10 ++- .../cube/v1/SerializedHBaseTupleIterator.java | 4 +- .../observer/AggregateRegionObserver.java | 4 +- .../observer/AggregationScanner.java| 14 ++- .../observer/ObserverAggregationCache.java | 10 ++- .../coprocessor/observer/ObserverEnabler.java | 4 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 13 +-- .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 9 +- .../coprocessor/endpoint/CubeVisitService.java | 4 +- .../storage/hbase/steps/CubeHTableUtil.java | 16 ++-- .../storage/hbase/steps/DeprecatedGCStep.java | 23 ++--- .../storage/hbase/steps/HBaseCuboidWriter.java | 7 +- .../hbase/steps/HBaseStreamingOutput.java | 9 +- .../kylin/storage/hbase/steps/MergeGCStep.java | 23 ++--- .../storage/hbase/util/CleanHtableCLI.java | 12 +-- .../storage/hbase/util/CubeMigrationCLI.java| 36 .../hbase/util/CubeMigrationCheckCLI.java | 17 ++-- .../hbase/util/DeployCoprocessorCLI.java| 22 ++--- .../hbase/util/ExtendCubeToHybridCLI.java | 8 +- .../hbase/util/GridTableHBaseBenchmark.java | 34 +++ .../kylin/storage/hbase/util/HBaseClean.java| 18 ++-- .../hbase/util/HBaseRegionSizeCalculator.java | 35 .../kylin/storage/hbase/util/HBaseUsage.java| 9 +- .../storage/hbase/util/HbaseStreamingInput.java | 30 +++ .../hbase/util/HtableAlterMetadataCLI.java | 9 +- .../storage/hbase/util/OrphanHBaseCleanJob.java | 19 ++-- .../kylin/storage/hbase/util/PingHBaseCLI.java | 15 ++-- .../kylin/storage/hbase/util/RowCounterCLI.java | 11 +-- .../storage/hbase/util/StorageCleanupJob.java | 20 +++-- .../storage/hbase/util/UpdateHTableHostCLI.java | 17 ++-- .../observer/AggregateRegionObserverTest.java | 26 ++ .../v1/filter/TestFuzzyRowFilterV2EndToEnd.java | 5 +- .../org/apache/kylin/tool/CubeMigrationCLI.java | 36 .../kylin/tool/CubeMigrationCheckCLI.java | 16 ++-- .../kylin/tool/ExtendCubeToHybridCLI.java | 8 +- .../apache/kylin/tool/StorageCleanupJob.java| 20 +++-- 53 files changed, 450 insertions(+), 500 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/337caca9/examples/test_case_data/sandbox/hbase-site.xml -- diff --git a/examples/test_case_data/sandbox/hbase-site.xml b/examples/test_case_data/sandbox/hbase-site.xml index 46d5345..734908e 100644 --- a/examples/test_case_data/sandbox/hbase-site.xml +++ b/examples/test_case_data/sandbox/hbase-site.xml @@ -190,22 +190,5 @@ zookeeper.znode.parent /hbase-unsecure - -hbase.client.pause -100 -General client pause value. Used mostly as value to wait -before running a retry of a failed get, region lookup, etc. -See hbase.client.retries.number for description of how we backoff from -this initial pause amount and how this pause works w/ retries. - - -hbase.client.retries.number -5 -Maximum retries. Used as maximum for all retryable -operations such as the getting of a cell's value, starting a row update, -
[1/2] kylin git commit: KYLIN-1528 Create a branch for v1.5 with HBase 1.x API [Forced Update!]
Repository: kylin Updated Branches: refs/heads/yang21-hbase1.x 3bfe52498 -> 337caca97 (forced update) http://git-wip-us.apache.org/repos/asf/kylin/blob/337caca9/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java -- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java index 0e95102..c59fb33 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java @@ -27,7 +27,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.cube.CubeSegment; @@ -56,7 +57,7 @@ public class HBaseStreamingOutput implements IStreamingOutput { try { CubeSegment cubeSegment = (CubeSegment) buildable; -final HTableInterface hTable; +final Table hTable; hTable = createHTable(cubeSegment); List cuboidWriters = Lists.newArrayList(); cuboidWriters.add(new HBaseCuboidWriter(cubeSegment, hTable)); @@ -88,10 +89,10 @@ public class HBaseStreamingOutput implements IStreamingOutput { } } -private HTableInterface createHTable(final CubeSegment cubeSegment) throws IOException { +private Table createHTable(final CubeSegment cubeSegment) throws IOException { final String hTableName = cubeSegment.getStorageLocationIdentifier(); CubeHTableUtil.createHTable(cubeSegment, null); -final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName); +final Table hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(TableName.valueOf(hTableName)); logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!"); return hTable; } http://git-wip-us.apache.org/repos/asf/kylin/blob/337caca9/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java -- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java index 5b2441c..2f7e164 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java @@ -24,11 +24,11 @@ import java.util.Collections; import java.util.List; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; @@ -69,19 +69,20 @@ public class MergeGCStep extends AbstractExecutable { List oldTables = getOldHTables(); if (oldTables != null && oldTables.size() > 0) { String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); -Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); -HBaseAdmin admin = null; +Admin admin = null; try { -admin = new HBaseAdmin(conf); +Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); +admin = conn.getAdmin(); + for (String table : oldTables) { -if (admin.tableExists(table)) { -HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table)); +if (admin.tableExists(TableName.valueOf(table))) { +HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf((table))); String host = tableDescriptor.getValue(IRealizationConstants.HTableTag); if (metadataUrlPrefix.equalsIgnoreCase(host)) { -
[2/2] kylin git commit: KYLIN-1528 Create a branch for v1.5 with HBase 1.x API
KYLIN-1528 Create a branch for v1.5 with HBase 1.x API Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/337caca9 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/337caca9 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/337caca9 Branch: refs/heads/yang21-hbase1.x Commit: 337caca973597e346cbc736b815d9880e5a6fff8 Parents: 3b07c26 Author: shaofengshiAuthored: Wed Mar 23 17:07:05 2016 +0800 Committer: Hongbin Ma Committed: Tue Nov 8 22:19:21 2016 +0800 -- examples/test_case_data/sandbox/hbase-site.xml | 19 +--- .../kylin/provision/BuildCubeWithEngine.java| 13 ++- pom.xml | 12 +-- .../kylin/rest/security/AclHBaseStorage.java| 4 +- .../rest/security/MockAclHBaseStorage.java | 8 +- .../apache/kylin/rest/security/MockHTable.java | 95 .../rest/security/RealAclHBaseStorage.java | 9 +- .../apache/kylin/rest/service/AclService.java | 25 +++--- .../apache/kylin/rest/service/CubeService.java | 36 +++- .../apache/kylin/rest/service/QueryService.java | 24 +++-- .../apache/kylin/rest/service/UserService.java | 17 ++-- .../kylin/storage/hbase/HBaseConnection.java| 44 - .../kylin/storage/hbase/HBaseResourceStore.java | 31 +++ .../kylin/storage/hbase/HBaseStorage.java | 3 +- .../storage/hbase/cube/SimpleHBaseStore.java| 20 ++--- .../hbase/cube/v1/CubeSegmentTupleIterator.java | 11 +-- .../storage/hbase/cube/v1/CubeStorageQuery.java | 6 +- .../hbase/cube/v1/RegionScannerAdapter.java | 10 ++- .../cube/v1/SerializedHBaseTupleIterator.java | 4 +- .../observer/AggregateRegionObserver.java | 4 +- .../observer/AggregationScanner.java| 14 ++- .../observer/ObserverAggregationCache.java | 10 ++- .../coprocessor/observer/ObserverEnabler.java | 4 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 13 +-- .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 9 +- .../coprocessor/endpoint/CubeVisitService.java | 4 +- .../storage/hbase/steps/CubeHTableUtil.java | 16 ++-- .../storage/hbase/steps/DeprecatedGCStep.java | 23 ++--- .../storage/hbase/steps/HBaseCuboidWriter.java | 7 +- .../hbase/steps/HBaseStreamingOutput.java | 9 +- .../kylin/storage/hbase/steps/MergeGCStep.java | 23 ++--- .../storage/hbase/util/CleanHtableCLI.java | 12 +-- .../storage/hbase/util/CubeMigrationCLI.java| 36 .../hbase/util/CubeMigrationCheckCLI.java | 17 ++-- .../hbase/util/DeployCoprocessorCLI.java| 22 ++--- .../hbase/util/ExtendCubeToHybridCLI.java | 8 +- .../hbase/util/GridTableHBaseBenchmark.java | 34 +++ .../kylin/storage/hbase/util/HBaseClean.java| 18 ++-- .../hbase/util/HBaseRegionSizeCalculator.java | 35 .../kylin/storage/hbase/util/HBaseUsage.java| 9 +- .../storage/hbase/util/HbaseStreamingInput.java | 30 +++ .../hbase/util/HtableAlterMetadataCLI.java | 9 +- .../storage/hbase/util/OrphanHBaseCleanJob.java | 19 ++-- .../kylin/storage/hbase/util/PingHBaseCLI.java | 15 ++-- .../kylin/storage/hbase/util/RowCounterCLI.java | 11 +-- .../storage/hbase/util/StorageCleanupJob.java | 20 +++-- .../storage/hbase/util/UpdateHTableHostCLI.java | 17 ++-- .../observer/AggregateRegionObserverTest.java | 26 ++ .../v1/filter/TestFuzzyRowFilterV2EndToEnd.java | 5 +- .../org/apache/kylin/tool/CubeMigrationCLI.java | 36 .../kylin/tool/CubeMigrationCheckCLI.java | 16 ++-- .../kylin/tool/ExtendCubeToHybridCLI.java | 8 +- .../apache/kylin/tool/StorageCleanupJob.java| 20 +++-- 53 files changed, 450 insertions(+), 500 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/337caca9/examples/test_case_data/sandbox/hbase-site.xml -- diff --git a/examples/test_case_data/sandbox/hbase-site.xml b/examples/test_case_data/sandbox/hbase-site.xml index 46d5345..734908e 100644 --- a/examples/test_case_data/sandbox/hbase-site.xml +++ b/examples/test_case_data/sandbox/hbase-site.xml @@ -190,22 +190,5 @@ zookeeper.znode.parent /hbase-unsecure - -hbase.client.pause -100 -General client pause value. Used mostly as value to wait -before running a retry of a failed get, region lookup, etc. -See hbase.client.retries.number for description of how we backoff from -this initial pause amount and how this pause works w/ retries. - - -hbase.client.retries.number -5 -Maximum retries. Used as maximum for all retryable -operations such as the getting of a cell's value, starting a row update, -
[1/4] kylin git commit: KYLIN-2167 FactDistinctColumnsReducer may get wrong max/min partition col value
Repository: kylin Updated Branches: refs/heads/master 47de9611b -> e7a20a063 KYLIN-2167 FactDistinctColumnsReducer may get wrong max/min partition col value Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e7a20a06 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e7a20a06 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e7a20a06 Branch: refs/heads/master Commit: e7a20a063a3f007aa2a0ed5f39c616880ba46118 Parents: 4d9a923 Author: shaofengshiAuthored: Tue Nov 8 14:23:11 2016 +0800 Committer: shaofengshi Committed: Tue Nov 8 21:29:22 2016 +0800 -- .../mr/steps/FactDistinctColumnsReducer.java| 8 .../mr/steps/UpdateCubeInfoAfterBuildStep.java | 40 +--- 2 files changed, 18 insertions(+), 30 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/e7a20a06/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java -- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index 5b00381..b09e614 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -152,14 +152,6 @@ public class FactDistinctColumnsReducer extends KylinReducer 1) { -colValues.set(1, value); -} else { -colValues.add(value); -} } else { colValues.add(new ByteArray(Bytes.copy(key.getBytes(), 1, key.getLength() - 1))); if (colValues.size() == 100) { //spill every 1 million http://git-wip-us.apache.org/repos/asf/kylin/blob/e7a20a06/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java -- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java index d285799..4d71f5d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java @@ -19,6 +19,7 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; +import java.text.ParseException; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; @@ -82,23 +83,8 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { private void updateTimeRange(CubeSegment segment) throws IOException { final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef(); -final String factDistinctPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); -final ReadableTable readableTable = new DFSFileTable(factDistinctPath + "/" + partitionCol.getName(), -1); -final ReadableTable.TableReader tableReader = readableTable.getReader(); -String minValue = null, maxValue = null; -try { -while (tableReader.next()) { -if (minValue == null) { -minValue = tableReader.getRow()[0]; -} -maxValue = tableReader.getRow()[0]; -} -} finally { -IOUtils.closeQuietly(tableReader); -} - final DataType partitionColType = partitionCol.getType(); -FastDateFormat dateFormat; +final FastDateFormat dateFormat; if (partitionColType.isDate()) { dateFormat = DateFormat.getDateFormat(DateFormat.DEFAULT_DATE_PATTERN); } else if (partitionColType.isDatetime() || partitionColType.isTimestamp()) { @@ -113,14 +99,24 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { throw new IllegalStateException("Type " + partitionColType + " is not valid partition column type"); } +final String factDistinctPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); +final ReadableTable readableTable = new DFSFileTable(factDistinctPath + "/" + partitionCol.getName(),
[3/4] kylin git commit: KYLIN-2135 update UpdateCubeInfoAfterBuildStep for the new folder structure
KYLIN-2135 update UpdateCubeInfoAfterBuildStep for the new folder structure Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/4d9a9231 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/4d9a9231 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/4d9a9231 Branch: refs/heads/master Commit: 4d9a92319ae6f0f778328f06d153cc6a7c9c93a8 Parents: dd496a6 Author: shaofengshiAuthored: Tue Nov 8 13:54:35 2016 +0800 Committer: shaofengshi Committed: Tue Nov 8 21:29:22 2016 +0800 -- .../mr/steps/UpdateCubeInfoAfterBuildStep.java | 35 +++- 1 file changed, 12 insertions(+), 23 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d9a9231/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java -- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java index f7af42e..d285799 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateCubeInfoAfterBuildStep.java @@ -18,22 +18,17 @@ package org.apache.kylin.engine.mr.steps; -import java.io.BufferedReader; import java.io.IOException; -import java.io.InputStreamReader; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.time.FastDateFormat; -import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; import org.apache.kylin.common.util.DateFormat; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.engine.mr.CubingJob; -import org.apache.kylin.engine.mr.HadoopUtil; +import org.apache.kylin.engine.mr.DFSFileTable; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; @@ -41,6 +36,7 @@ import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; import org.apache.kylin.metadata.datatype.DataType; import org.apache.kylin.metadata.model.TblColRef; +import org.apache.kylin.source.ReadableTable; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -86,26 +82,19 @@ public class UpdateCubeInfoAfterBuildStep extends AbstractExecutable { private void updateTimeRange(CubeSegment segment) throws IOException { final TblColRef partitionCol = segment.getCubeDesc().getModel().getPartitionDesc().getPartitionDateColumnRef(); -final String outputPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); -final Path outputFile = new Path(outputPath, partitionCol.getName()); - -String minValue = null, maxValue = null, currentValue = null; -FSDataInputStream inputStream = null; -BufferedReader bufferedReader = null; +final String factDistinctPath = this.getParams().get(BatchConstants.CFG_OUTPUT_PATH); +final ReadableTable readableTable = new DFSFileTable(factDistinctPath + "/" + partitionCol.getName(), -1); +final ReadableTable.TableReader tableReader = readableTable.getReader(); +String minValue = null, maxValue = null; try { -FileSystem fs = HadoopUtil.getFileSystem(outputPath); -inputStream = fs.open(outputFile); -bufferedReader = new BufferedReader(new InputStreamReader(inputStream)); -minValue = currentValue = bufferedReader.readLine(); -while (currentValue != null) { -maxValue = currentValue; -currentValue = bufferedReader.readLine(); +while (tableReader.next()) { +if (minValue == null) { +minValue = tableReader.getRow()[0]; +} +maxValue = tableReader.getRow()[0]; } -} catch (IOException e) { -throw e; } finally { -IOUtils.closeQuietly(bufferedReader); -IOUtils.closeQuietly(inputStream); +IOUtils.closeQuietly(tableReader); } final DataType partitionColType = partitionCol.getType();
[4/4] kylin git commit: KYLIN-2135 Enlarge FactDistinctColumns reducer number
KYLIN-2135 Enlarge FactDistinctColumns reducer number Signed-off-by: shaofengshiProject: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/74214030 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/74214030 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/74214030 Branch: refs/heads/master Commit: 74214030272ffef275ccf0359b583b3278aec468 Parents: 47de961 Author: kangkaisen Authored: Wed Oct 26 19:35:20 2016 +0800 Committer: shaofengshi Committed: Tue Nov 8 21:29:22 2016 +0800 -- .../apache/kylin/common/KylinConfigBase.java| 5 ++ .../java/org/apache/kylin/cube/CubeManager.java | 35 .../kylin/engine/mr/DFSFileTableReader.java | 59 .../kylin/engine/mr/common/BatchConstants.java | 5 ++ .../mr/steps/FactDistinctColumnPartitioner.java | 11 +--- .../engine/mr/steps/FactDistinctColumnsJob.java | 18 +- .../mr/steps/FactDistinctColumnsMapperBase.java | 17 +- .../mr/steps/FactDistinctColumnsReducer.java| 34 ++- .../mr/steps/FactDistinctHiveColumnsMapper.java | 12 +++- 9 files changed, 170 insertions(+), 26 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java -- diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index d9d10bb..6d3e807 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -374,6 +374,11 @@ abstract public class KylinConfigBase implements Serializable { return Boolean.parseBoolean(getOptional("kylin.job.allow.empty.segment", "true")); } +//UHC: ultra high cardinality columns, contain the ShardByColumns and the GlobalDictionaryColumns +public int getUHCReducerCount() { +return Integer.parseInt(getOptional("kylin.job.uhc.reducer.count", "3")); +} + public String getOverrideHiveTableLocation(String table) { return getOptional("hive.table.location." + table.toUpperCase()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/74214030/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java -- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java index 87bb93d..9893040 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/CubeManager.java @@ -31,6 +31,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Random; +import java.util.Set; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; @@ -43,6 +44,7 @@ import org.apache.kylin.common.persistence.Serializer; import org.apache.kylin.common.util.Dictionary; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.cube.model.DictionaryDesc; import org.apache.kylin.cube.model.DimensionDesc; import org.apache.kylin.dict.DictionaryInfo; import org.apache.kylin.dict.DictionaryManager; @@ -1049,4 +1051,37 @@ public class CubeManager implements IRealizationProvider { } return holes; } + +private final String GLOBAL_DICTIONNARY_CLASS = "org.apache.kylin.dict.GlobalDictionaryBuilder"; + +//UHC (ultra high cardinality column): contain the ShardByColumns and the GlobalDictionaryColumns +public int[] getUHCIndex(CubeDesc cubeDesc) throws IOException { +List factDictCols = getAllDictColumnsOnFact(cubeDesc); +int[] uhcIndex = new int[factDictCols.size()]; + +//add GlobalDictionaryColumns +List dictionaryDescList = cubeDesc.getDictionaries(); +if (dictionaryDescList != null) { +for (DictionaryDesc dictionaryDesc : dictionaryDescList) { +if (dictionaryDesc.getBuilderClass() != null && dictionaryDesc.getBuilderClass().equalsIgnoreCase(GLOBAL_DICTIONNARY_CLASS)) { +for (int i = 0; i < factDictCols.size(); i++) { +if (factDictCols.get(i).equals(dictionaryDesc.getColumnRef())) { +uhcIndex[i] = 1; +break; +} +} +} +} +} + +//add ShardByColumns +Set shardByColumns = cubeDesc.getShardByColumns(); +for (int i = 0; i <
[2/4] kylin git commit: KYLIN-2135 minor format update
KYLIN-2135 minor format update Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/dd496a69 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/dd496a69 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/dd496a69 Branch: refs/heads/master Commit: dd496a6945ba20192c2fa2bda845c055357ab44a Parents: 7421403 Author: shaofengshiAuthored: Thu Nov 3 18:49:50 2016 +0800 Committer: shaofengshi Committed: Tue Nov 8 21:29:22 2016 +0800 -- .../kylin/engine/mr/DFSFileTableReader.java | 92 ++-- .../engine/mr/steps/FactDistinctColumnsJob.java | 34 2 files changed, 61 insertions(+), 65 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/dd496a69/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java -- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java index dda1d6f..173c908 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/DFSFileTableReader.java @@ -23,14 +23,15 @@ import java.io.Closeable; import java.io.EOFException; import java.io.IOException; import java.io.InputStreamReader; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import org.apache.commons.io.IOUtils; import org.apache.commons.lang.StringEscapeUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; -import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.SequenceFile; @@ -57,7 +58,7 @@ public class DFSFileTableReader implements TableReader { private String filePath; private String delim; -private List readerList; +private List readerList; private String curLine; private String[] curColumns; @@ -72,33 +73,33 @@ public class DFSFileTableReader implements TableReader { this.filePath = filePath; this.delim = delim; this.expectedColumnNumber = expectedColumnNumber; -this.readerList = new ArrayList(); +this.readerList = new ArrayList(); FileSystem fs = HadoopUtil.getFileSystem(filePath); -ArrayList allFiles = new ArrayList<>(); -FileStatus status = fs.getFileStatus(new Path(filePath)); -if (status.isFile()) { -allFiles.add(status); -} else { -FileStatus[] listStatus = fs.listStatus(new Path(filePath)); -allFiles.addAll(Arrays.asList(listStatus)); -} - -try { -for (FileStatus f : allFiles) { -RowReader rowReader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, f.getPath().toString()); -this.readerList.add(rowReader); -} +ArrayList allFiles = new ArrayList<>(); +FileStatus status = fs.getFileStatus(new Path(filePath)); +if (status.isFile()) { +allFiles.add(status); +} else { +FileStatus[] listStatus = fs.listStatus(new Path(filePath)); +allFiles.addAll(Arrays.asList(listStatus)); +} + +try { +for (FileStatus f : allFiles) { +RowReader rowReader = new SeqRowReader(HadoopUtil.getCurrentConfiguration(), fs, f.getPath().toString()); +this.readerList.add(rowReader); +} } catch (IOException e) { if (isExceptionSayingNotSeqFile(e) == false) throw e; -this.readerList = new ArrayList(); -for (FileStatus f : allFiles) { -RowReader rowReader = new CsvRowReader(fs, f.getPath().toString()); -this.readerList.add(rowReader); -} +this.readerList = new ArrayList(); +for (FileStatus f : allFiles) { +RowReader rowReader = new CsvRowReader(fs, f.getPath().toString()); +this.readerList.add(rowReader); +} } } @@ -114,20 +115,20 @@ public class DFSFileTableReader implements TableReader { @Override public boolean next() throws IOException { -int curReaderIndex = -1; -RowReader curReader; - -while (++curReaderIndex < readerList.size()) { -curReader = readerList.get(curReaderIndex); -curLine = curReader.nextLine(); -curColumns = null; - -if (curLine !=
[4/4] kylin git commit: KYLIN-1672 support kylin on cdh 5.7
KYLIN-1672 support kylin on cdh 5.7 Signed-off-by: Li YangProject: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5ee22ec5 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5ee22ec5 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5ee22ec5 Branch: refs/heads/yang21-cdh5.7 Commit: 5ee22ec53805b12377c2ba4e34429d019f626edd Parents: 3bfe524 Author: Lynne Jiang Authored: Mon May 16 03:33:27 2016 -0700 Committer: Li Yang Committed: Tue Nov 8 20:28:12 2016 + -- build/conf/kylin.properties | 3 + dev-support/test_all_against_hdp_2_2_4_2_2.sh | 0 .../kylin/engine/mr/steps/MockupMapContext.java | 15 +- examples/test_case_data/sandbox/core-site.xml | 146 +++--- examples/test_case_data/sandbox/hbase-site.xml | 162 ++ examples/test_case_data/sandbox/hdfs-site.xml | 259 ++ examples/test_case_data/sandbox/mapred-site.xml | 398 ++- examples/test_case_data/sandbox/yarn-site.xml | 496 ++- pom.xml | 16 +- server/pom.xml | 36 ++ .../storage/hbase/steps/MockupMapContext.java | 19 +- tool/pom.xml| 12 + 12 files changed, 431 insertions(+), 1131 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/5ee22ec5/build/conf/kylin.properties -- diff --git a/build/conf/kylin.properties b/build/conf/kylin.properties index ed86bdb..ebd8cde 100644 --- a/build/conf/kylin.properties +++ b/build/conf/kylin.properties @@ -118,6 +118,9 @@ kylin.job.mapreduce.mapper.input.rows=100 kylin.job.step.timeout=7200 +# for secure cdh, filtering hive dependency is risky, so filter nothing +kylin.job.dependency.filterlist=[^,]+ + ### CUBE ### # 'auto', 'inmem', 'layer' or 'random' for testing http://git-wip-us.apache.org/repos/asf/kylin/blob/5ee22ec5/dev-support/test_all_against_hdp_2_2_4_2_2.sh -- diff --git a/dev-support/test_all_against_hdp_2_2_4_2_2.sh b/dev-support/test_all_against_hdp_2_2_4_2_2.sh old mode 100644 new mode 100755 http://git-wip-us.apache.org/repos/asf/kylin/blob/5ee22ec5/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MockupMapContext.java -- diff --git a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MockupMapContext.java b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MockupMapContext.java index 847071d..9900465 100644 --- a/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MockupMapContext.java +++ b/engine-mr/src/test/java/org/apache/kylin/engine/mr/steps/MockupMapContext.java @@ -77,6 +77,7 @@ public class MockupMapContext { outKV[0] = key; outKV[1] = value; } + } @Override @@ -99,6 +100,7 @@ public class MockupMapContext { throw new NotImplementedException(); } + @Override public float getProgress() { throw new NotImplementedException(); @@ -195,17 +197,17 @@ public class MockupMapContext { } @Override -public RawComparator getSortComparator() { +public boolean userClassesTakesPrecedence() { throw new NotImplementedException(); } @Override -public String getJar() { +public RawComparator getSortComparator() { throw new NotImplementedException(); } @Override -public RawComparator getGroupingComparator() { +public String getJar() { throw new NotImplementedException(); } @@ -221,7 +223,7 @@ public class MockupMapContext { @Override public boolean getProfileEnabled() { -throw new NotImplementedException(); +return false; } @Override @@ -308,6 +310,11 @@ public class MockupMapContext { public RawComparator getCombinerKeyGroupingComparator() { throw new NotImplementedException(); } + +@Override +public RawComparator getGroupingComparator() { +return null; +} }); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/5ee22ec5/examples/test_case_data/sandbox/core-site.xml -- diff --git a/examples/test_case_data/sandbox/core-site.xml
[1/4] kylin git commit: KYLIN-2170 fix cleanup() in mapper and reducer [Forced Update!]
Repository: kylin Updated Branches: refs/heads/yang21-cdh5.7 f6e377d82 -> 5ee22ec53 (forced update) KYLIN-2170 fix cleanup() in mapper and reducer Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3b07c26a Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3b07c26a Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3b07c26a Branch: refs/heads/yang21-cdh5.7 Commit: 3b07c26a0e633d3d367bce52cd7496cd1e860d0f Parents: 1720f1e Author: Li YangAuthored: Tue Nov 8 18:45:59 2016 +0800 Committer: Li Yang Committed: Tue Nov 8 18:46:21 2016 +0800 -- .../mr/steps/FactDistinctColumnsReducer.java| 39 ++-- .../mr/steps/FactDistinctHiveColumnsMapper.java | 34 ++- .../engine/mr/steps/InMemCuboidMapper.java | 24 +--- .../steps/RowKeyDistributionCheckerMapper.java | 12 ++-- .../cardinality/ColumnCardinalityMapper.java| 22 --- .../cardinality/ColumnCardinalityReducer.java | 39 ++-- .../hbase/steps/RangeKeyDistributionMapper.java | 10 ++- .../steps/RangeKeyDistributionReducer.java | 64 +++- 8 files changed, 137 insertions(+), 107 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/3b07c26a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java -- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index a7b2e56..a9c5d4b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -162,25 +162,28 @@ public class FactDistinctColumnsReducer extends KylinReducer 0) { -outputDistinctValues(col, colValues, context); -colValues.clear(); -} -} else { -//output the hll info; -long grandTotal = 0; -for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) { -grandTotal += hll.getCountEstimate(); +try { +if (isStatistics == false) { +if (!outputTouched || colValues.size() > 0) { +outputDistinctValues(col, colValues, context); +colValues.clear(); +} +} else { +//output the hll info; +long grandTotal = 0; +for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) { +grandTotal += hll.getCountEstimate(); +} +double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal; + +int mapperNumber = baseCuboidRowCountInMappers.size(); + +writeMapperAndCuboidStatistics(context); // for human check + CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), // +cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio); } -double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal; - -int mapperNumber = baseCuboidRowCountInMappers.size(); - -writeMapperAndCuboidStatistics(context); // for human check -CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), // -cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio); +} catch (Throwable ex) { +logger.error("", ex); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/3b07c26a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java -- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java index 5e278f8..2154bc6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java @@ -158,22 +158,26 @@ public class FactDistinctHiveColumnsMapper extends FactDistinctColumnsMap @Override
[2/3] kylin git commit: KYLIN-1528 Create a branch for v1.5 with HBase 1.x API
http://git-wip-us.apache.org/repos/asf/kylin/blob/3bfe5249/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java -- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java index 0e95102..c59fb33 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseStreamingOutput.java @@ -27,7 +27,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hbase.client.HTableInterface; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Table; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.cube.CubeSegment; @@ -56,7 +57,7 @@ public class HBaseStreamingOutput implements IStreamingOutput { try { CubeSegment cubeSegment = (CubeSegment) buildable; -final HTableInterface hTable; +final Table hTable; hTable = createHTable(cubeSegment); List cuboidWriters = Lists.newArrayList(); cuboidWriters.add(new HBaseCuboidWriter(cubeSegment, hTable)); @@ -88,10 +89,10 @@ public class HBaseStreamingOutput implements IStreamingOutput { } } -private HTableInterface createHTable(final CubeSegment cubeSegment) throws IOException { +private Table createHTable(final CubeSegment cubeSegment) throws IOException { final String hTableName = cubeSegment.getStorageLocationIdentifier(); CubeHTableUtil.createHTable(cubeSegment, null); -final HTableInterface hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(hTableName); +final Table hTable = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()).getTable(TableName.valueOf(hTableName)); logger.info("hTable:" + hTableName + " for segment:" + cubeSegment.getName() + " created!"); return hTable; } http://git-wip-us.apache.org/repos/asf/kylin/blob/3bfe5249/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java -- diff --git a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java index 5b2441c..2f7e164 100644 --- a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java +++ b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/MergeGCStep.java @@ -24,11 +24,11 @@ import java.util.Collections; import java.util.List; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HTableDescriptor; -import org.apache.hadoop.hbase.client.HBaseAdmin; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Admin; +import org.apache.hadoop.hbase.client.Connection; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.Bytes; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; @@ -69,19 +69,20 @@ public class MergeGCStep extends AbstractExecutable { List oldTables = getOldHTables(); if (oldTables != null && oldTables.size() > 0) { String metadataUrlPrefix = KylinConfig.getInstanceFromEnv().getMetadataUrlPrefix(); -Configuration conf = HBaseConnection.getCurrentHBaseConfiguration(); -HBaseAdmin admin = null; +Admin admin = null; try { -admin = new HBaseAdmin(conf); +Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); +admin = conn.getAdmin(); + for (String table : oldTables) { -if (admin.tableExists(table)) { -HTableDescriptor tableDescriptor = admin.getTableDescriptor(Bytes.toBytes(table)); +if (admin.tableExists(TableName.valueOf(table))) { +HTableDescriptor tableDescriptor = admin.getTableDescriptor(TableName.valueOf((table))); String host = tableDescriptor.getValue(IRealizationConstants.HTableTag); if (metadataUrlPrefix.equalsIgnoreCase(host)) { -if (admin.isTableEnabled(table)) { -admin.disableTable(table); +
[3/3] kylin git commit: KYLIN-1528 Create a branch for v1.5 with HBase 1.x API
KYLIN-1528 Create a branch for v1.5 with HBase 1.x API Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3bfe5249 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3bfe5249 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3bfe5249 Branch: refs/heads/yang21-hbase1.x Commit: 3bfe52498d5d547d4201028570b16cf8f0873e0a Parents: 3b07c26 Author: shaofengshiAuthored: Wed Mar 23 17:07:05 2016 +0800 Committer: Li Yang Committed: Tue Nov 8 20:22:28 2016 + -- examples/test_case_data/sandbox/hbase-site.xml | 19 +--- .../kylin/provision/BuildCubeWithEngine.java| 13 ++- pom.xml | 12 +-- .../kylin/rest/security/AclHBaseStorage.java| 4 +- .../rest/security/MockAclHBaseStorage.java | 8 +- .../apache/kylin/rest/security/MockHTable.java | 95 .../rest/security/RealAclHBaseStorage.java | 9 +- .../apache/kylin/rest/service/AclService.java | 25 +++--- .../apache/kylin/rest/service/CubeService.java | 36 +++- .../apache/kylin/rest/service/QueryService.java | 24 +++-- .../apache/kylin/rest/service/UserService.java | 17 ++-- .../kylin/storage/hbase/HBaseConnection.java| 44 - .../kylin/storage/hbase/HBaseResourceStore.java | 31 +++ .../kylin/storage/hbase/HBaseStorage.java | 3 +- .../storage/hbase/cube/SimpleHBaseStore.java| 20 ++--- .../hbase/cube/v1/CubeSegmentTupleIterator.java | 11 +-- .../storage/hbase/cube/v1/CubeStorageQuery.java | 6 +- .../hbase/cube/v1/RegionScannerAdapter.java | 10 ++- .../cube/v1/SerializedHBaseTupleIterator.java | 4 +- .../observer/AggregateRegionObserver.java | 4 +- .../observer/AggregationScanner.java| 14 ++- .../observer/ObserverAggregationCache.java | 10 ++- .../coprocessor/observer/ObserverEnabler.java | 4 +- .../hbase/cube/v2/CubeHBaseEndpointRPC.java | 13 +-- .../storage/hbase/cube/v2/CubeHBaseScanRPC.java | 9 +- .../coprocessor/endpoint/CubeVisitService.java | 4 +- .../storage/hbase/steps/CubeHTableUtil.java | 16 ++-- .../storage/hbase/steps/DeprecatedGCStep.java | 23 ++--- .../storage/hbase/steps/HBaseCuboidWriter.java | 7 +- .../hbase/steps/HBaseStreamingOutput.java | 9 +- .../kylin/storage/hbase/steps/MergeGCStep.java | 23 ++--- .../storage/hbase/util/CleanHtableCLI.java | 12 +-- .../storage/hbase/util/CubeMigrationCLI.java| 36 .../hbase/util/CubeMigrationCheckCLI.java | 17 ++-- .../hbase/util/DeployCoprocessorCLI.java| 22 ++--- .../hbase/util/ExtendCubeToHybridCLI.java | 8 +- .../hbase/util/GridTableHBaseBenchmark.java | 34 +++ .../kylin/storage/hbase/util/HBaseClean.java| 18 ++-- .../hbase/util/HBaseRegionSizeCalculator.java | 35 .../kylin/storage/hbase/util/HBaseUsage.java| 9 +- .../storage/hbase/util/HbaseStreamingInput.java | 30 +++ .../hbase/util/HtableAlterMetadataCLI.java | 9 +- .../storage/hbase/util/OrphanHBaseCleanJob.java | 19 ++-- .../kylin/storage/hbase/util/PingHBaseCLI.java | 15 ++-- .../kylin/storage/hbase/util/RowCounterCLI.java | 11 +-- .../storage/hbase/util/StorageCleanupJob.java | 20 +++-- .../storage/hbase/util/UpdateHTableHostCLI.java | 17 ++-- .../observer/AggregateRegionObserverTest.java | 26 ++ .../v1/filter/TestFuzzyRowFilterV2EndToEnd.java | 5 +- .../org/apache/kylin/tool/CubeMigrationCLI.java | 36 .../kylin/tool/CubeMigrationCheckCLI.java | 16 ++-- .../kylin/tool/ExtendCubeToHybridCLI.java | 8 +- .../apache/kylin/tool/StorageCleanupJob.java| 20 +++-- 53 files changed, 450 insertions(+), 500 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/3bfe5249/examples/test_case_data/sandbox/hbase-site.xml -- diff --git a/examples/test_case_data/sandbox/hbase-site.xml b/examples/test_case_data/sandbox/hbase-site.xml index 46d5345..734908e 100644 --- a/examples/test_case_data/sandbox/hbase-site.xml +++ b/examples/test_case_data/sandbox/hbase-site.xml @@ -190,22 +190,5 @@ zookeeper.znode.parent /hbase-unsecure - -hbase.client.pause -100 -General client pause value. Used mostly as value to wait -before running a retry of a failed get, region lookup, etc. -See hbase.client.retries.number for description of how we backoff from -this initial pause amount and how this pause works w/ retries. - - -hbase.client.retries.number -5 -Maximum retries. Used as maximum for all retryable -operations such as the getting of a cell's value, starting a row update, -
[1/3] kylin git commit: KYLIN-2170 fix cleanup() in mapper and reducer [Forced Update!]
Repository: kylin Updated Branches: refs/heads/yang21-hbase1.x a3fd7d5e4 -> 3bfe52498 (forced update) KYLIN-2170 fix cleanup() in mapper and reducer Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3b07c26a Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3b07c26a Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3b07c26a Branch: refs/heads/yang21-hbase1.x Commit: 3b07c26a0e633d3d367bce52cd7496cd1e860d0f Parents: 1720f1e Author: Li YangAuthored: Tue Nov 8 18:45:59 2016 +0800 Committer: Li Yang Committed: Tue Nov 8 18:46:21 2016 +0800 -- .../mr/steps/FactDistinctColumnsReducer.java| 39 ++-- .../mr/steps/FactDistinctHiveColumnsMapper.java | 34 ++- .../engine/mr/steps/InMemCuboidMapper.java | 24 +--- .../steps/RowKeyDistributionCheckerMapper.java | 12 ++-- .../cardinality/ColumnCardinalityMapper.java| 22 --- .../cardinality/ColumnCardinalityReducer.java | 39 ++-- .../hbase/steps/RangeKeyDistributionMapper.java | 10 ++- .../steps/RangeKeyDistributionReducer.java | 64 +++- 8 files changed, 137 insertions(+), 107 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/3b07c26a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java -- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index a7b2e56..a9c5d4b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -162,25 +162,28 @@ public class FactDistinctColumnsReducer extends KylinReducer 0) { -outputDistinctValues(col, colValues, context); -colValues.clear(); -} -} else { -//output the hll info; -long grandTotal = 0; -for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) { -grandTotal += hll.getCountEstimate(); +try { +if (isStatistics == false) { +if (!outputTouched || colValues.size() > 0) { +outputDistinctValues(col, colValues, context); +colValues.clear(); +} +} else { +//output the hll info; +long grandTotal = 0; +for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) { +grandTotal += hll.getCountEstimate(); +} +double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal; + +int mapperNumber = baseCuboidRowCountInMappers.size(); + +writeMapperAndCuboidStatistics(context); // for human check + CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), // +cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio); } -double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal; - -int mapperNumber = baseCuboidRowCountInMappers.size(); - -writeMapperAndCuboidStatistics(context); // for human check -CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), // -cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio); +} catch (Throwable ex) { +logger.error("", ex); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/3b07c26a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java -- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java index 5e278f8..2154bc6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java @@ -158,22 +158,26 @@ public class FactDistinctHiveColumnsMapper extends FactDistinctColumnsMap @Override
kylin git commit: KYLIN-2170 fix cleanup() in mapper and reducer
Repository: kylin Updated Branches: refs/heads/master d5d2b9c28 -> 47de9611b KYLIN-2170 fix cleanup() in mapper and reducer Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/47de9611 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/47de9611 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/47de9611 Branch: refs/heads/master Commit: 47de9611be71e3adeebe5da5041b37db5de9fa28 Parents: d5d2b9c Author: Li YangAuthored: Tue Nov 8 18:45:59 2016 +0800 Committer: Li Yang Committed: Tue Nov 8 18:56:33 2016 +0800 -- .../mr/steps/FactDistinctColumnsReducer.java| 39 ++-- .../mr/steps/FactDistinctHiveColumnsMapper.java | 34 ++- .../engine/mr/steps/InMemCuboidMapper.java | 24 +--- .../steps/RowKeyDistributionCheckerMapper.java | 12 ++-- .../cardinality/ColumnCardinalityMapper.java| 22 --- .../cardinality/ColumnCardinalityReducer.java | 39 ++-- .../hbase/steps/RangeKeyDistributionMapper.java | 10 ++- .../steps/RangeKeyDistributionReducer.java | 64 +++- 8 files changed, 137 insertions(+), 107 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/47de9611/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java -- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index c8624bb..ecbc6c2 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -176,25 +176,28 @@ public class FactDistinctColumnsReducer extends KylinReducer 0) { -outputDistinctValues(col, colValues, context); -colValues.clear(); -} -} else { -//output the hll info; -long grandTotal = 0; -for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) { -grandTotal += hll.getCountEstimate(); +try { +if (isStatistics == false) { +if (colValues.size() > 0) { +outputDistinctValues(col, colValues, context); +colValues.clear(); +} +} else { +//output the hll info; +long grandTotal = 0; +for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) { +grandTotal += hll.getCountEstimate(); +} +double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal; + +int mapperNumber = baseCuboidRowCountInMappers.size(); + +writeMapperAndCuboidStatistics(context); // for human check + CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), // +cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio); } -double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal; - -int mapperNumber = baseCuboidRowCountInMappers.size(); - -writeMapperAndCuboidStatistics(context); // for human check -CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), // -cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio); +} catch (Throwable ex) { +logger.error("", ex); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/47de9611/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java -- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java index 86ef487..177c9f6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java @@ -197,22 +197,26 @@ public class FactDistinctHiveColumnsMapper extends FactDistinctColumnsMap @Override protected void cleanup(Context context) throws IOException,
kylin git commit: KYLIN-2170 fix cleanup() in mapper and reducer
Repository: kylin Updated Branches: refs/heads/yang21 1720f1eb8 -> 3b07c26a0 KYLIN-2170 fix cleanup() in mapper and reducer Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/3b07c26a Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/3b07c26a Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/3b07c26a Branch: refs/heads/yang21 Commit: 3b07c26a0e633d3d367bce52cd7496cd1e860d0f Parents: 1720f1e Author: Li YangAuthored: Tue Nov 8 18:45:59 2016 +0800 Committer: Li Yang Committed: Tue Nov 8 18:46:21 2016 +0800 -- .../mr/steps/FactDistinctColumnsReducer.java| 39 ++-- .../mr/steps/FactDistinctHiveColumnsMapper.java | 34 ++- .../engine/mr/steps/InMemCuboidMapper.java | 24 +--- .../steps/RowKeyDistributionCheckerMapper.java | 12 ++-- .../cardinality/ColumnCardinalityMapper.java| 22 --- .../cardinality/ColumnCardinalityReducer.java | 39 ++-- .../hbase/steps/RangeKeyDistributionMapper.java | 10 ++- .../steps/RangeKeyDistributionReducer.java | 64 +++- 8 files changed, 137 insertions(+), 107 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/3b07c26a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java -- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java index a7b2e56..a9c5d4b 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsReducer.java @@ -162,25 +162,28 @@ public class FactDistinctColumnsReducer extends KylinReducer 0) { -outputDistinctValues(col, colValues, context); -colValues.clear(); -} -} else { -//output the hll info; -long grandTotal = 0; -for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) { -grandTotal += hll.getCountEstimate(); +try { +if (isStatistics == false) { +if (!outputTouched || colValues.size() > 0) { +outputDistinctValues(col, colValues, context); +colValues.clear(); +} +} else { +//output the hll info; +long grandTotal = 0; +for (HyperLogLogPlusCounter hll : cuboidHLLMap.values()) { +grandTotal += hll.getCountEstimate(); +} +double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal; + +int mapperNumber = baseCuboidRowCountInMappers.size(); + +writeMapperAndCuboidStatistics(context); // for human check + CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), // +cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio); } -double mapperOverlapRatio = grandTotal == 0 ? 0 : (double) totalRowsBeforeMerge / grandTotal; - -int mapperNumber = baseCuboidRowCountInMappers.size(); - -writeMapperAndCuboidStatistics(context); // for human check -CubeStatsWriter.writeCuboidStatistics(context.getConfiguration(), new Path(statisticsOutput), // -cuboidHLLMap, samplingPercentage, mapperNumber, mapperOverlapRatio); +} catch (Throwable ex) { +logger.error("", ex); } } http://git-wip-us.apache.org/repos/asf/kylin/blob/3b07c26a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java -- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java index 5e278f8..2154bc6 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctHiveColumnsMapper.java @@ -158,22 +158,26 @@ public class FactDistinctHiveColumnsMapper extends FactDistinctColumnsMap @Override protected void
kylin git commit: KYLIN-2162 Improve the cube validation error message
Repository: kylin Updated Branches: refs/heads/v1.6.0-rc1 55bc8fb6e -> 5d166aa8e KYLIN-2162 Improve the cube validation error message Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5d166aa8 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5d166aa8 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5d166aa8 Branch: refs/heads/v1.6.0-rc1 Commit: 5d166aa8ee5270316c076cbb84eb43742ee278aa Parents: 55bc8fb Author: shaofengshiAuthored: Tue Nov 8 17:11:34 2016 +0800 Committer: shaofengshi Committed: Tue Nov 8 17:14:10 2016 +0800 -- .../org/apache/kylin/cube/model/CubeDesc.java | 64 .../validation/rule/AggregationGroupRule.java | 52 +--- .../kylin/cube/AggregationGroupRuleTest.java| 6 +- .../org/apache/kylin/cube/CubeDescTest.java | 14 ++--- 4 files changed, 92 insertions(+), 44 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/5d166aa8/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java -- diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java index 5c73f21..47e9be8 100644 --- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java +++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java @@ -29,6 +29,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.HashMap; +import java.util.HashSet; import java.util.LinkedHashMap; import java.util.LinkedHashSet; import java.util.List; @@ -39,6 +40,7 @@ import java.util.TreeSet; import javax.annotation.Nullable; +import com.google.common.collect.Iterables; import org.apache.commons.codec.binary.Base64; import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.ArrayUtils; @@ -50,6 +52,7 @@ import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.persistence.RootPersistentEntity; import org.apache.kylin.common.util.Array; import org.apache.kylin.common.util.JsonUtil; +import org.apache.kylin.common.util.Pair; import org.apache.kylin.measure.MeasureType; import org.apache.kylin.measure.extendedcolumn.ExtendedColumnMeasureType; import org.apache.kylin.metadata.MetadataConstants; @@ -587,8 +590,15 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { } if (!includeDims.containsAll(mandatoryDims) || !includeDims.containsAll(hierarchyDims) || !includeDims.containsAll(jointDims)) { -logger.error("Aggregation group " + index + " Include dims not containing all the used dims"); -throw new IllegalStateException("Aggregation group " + index + " Include dims not containing all the used dims"); +List notIncluded = Lists.newArrayList(); +final Iterable all = Iterables.unmodifiableIterable(Iterables.concat(mandatoryDims, hierarchyDims, jointDims)); +for (String dim : all) { +if (includeDims.contains(dim) == false) { +notIncluded.add(dim); +} +} +logger.error("Aggregation group " + index + " Include dimensions not containing all the used dimensions"); +throw new IllegalStateException("Aggregation group " + index + " 'includes' dimensions not include all the dimensions:" + notIncluded.toString()); } Set normalDims = new TreeSet<>(String.CASE_INSENSITIVE_ORDER); @@ -607,33 +617,36 @@ public class CubeDesc extends RootPersistentEntity implements IEngineAware { } if (CollectionUtils.containsAny(mandatoryDims, hierarchyDims)) { -logger.warn("Aggregation group " + index + " mandatory dims overlap with hierarchy dims"); +logger.warn("Aggregation group " + index + " mandatory dimensions overlap with hierarchy dimensions: " + CollectionUtils.intersection(mandatoryDims, hierarchyDims)); } if (CollectionUtils.containsAny(mandatoryDims, jointDims)) { -logger.warn("Aggregation group " + index + " mandatory dims overlap with joint dims"); +logger.warn("Aggregation group " + index + " mandatory dimensions overlap with joint dimensions: " + CollectionUtils.intersection(mandatoryDims, jointDims)); } if (CollectionUtils.containsAny(hierarchyDims, jointDims)) { -logger.error("Aggregation group " + index + " hierarchy dims overlap with joint dims"); -throw new
kylin git commit: KYLIN-2169 Refactor AbstractExecutable to respect KylinConfig
Repository: kylin Updated Branches: refs/heads/master 637581fb7 -> 9615b4ea6 KYLIN-2169 Refactor AbstractExecutable to respect KylinConfig Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/9615b4ea Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/9615b4ea Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/9615b4ea Branch: refs/heads/master Commit: 9615b4ea6f817606c93df73dcafdcb151f4e8632 Parents: 637581f Author: Li YangAuthored: Tue Nov 8 15:47:33 2016 +0800 Committer: Li Yang Committed: Tue Nov 8 17:02:38 2016 +0800 -- .../kylin/job/common/ShellExecutable.java | 2 +- .../kylin/job/execution/AbstractExecutable.java | 37 +- .../job/execution/DefaultChainedExecutable.java | 1 - .../kylin/job/execution/ExecutableManager.java | 372 ++ .../job/impl/threadpool/DefaultScheduler.java | 2 +- .../kylin/job/manager/ExecutableManager.java| 377 --- .../apache/kylin/job/ExecutableManagerTest.java | 2 +- .../job/impl/threadpool/BaseSchedulerTest.java | 2 +- .../org/apache/kylin/engine/mr/CubingJob.java | 2 +- .../engine/mr/common/MapReduceExecutable.java | 16 +- .../apache/kylin/engine/mr/steps/CuboidJob.java | 2 +- .../kylin/engine/mr/steps/InMemCuboidJob.java | 2 +- .../engine/mr/steps/SaveStatisticsStep.java | 2 +- .../mr/steps/UpdateCubeInfoAfterBuildStep.java | 2 +- .../mr/steps/UpdateCubeInfoAfterMergeStep.java | 2 +- .../kylin/provision/BuildCubeWithEngine.java| 2 +- .../kylin/provision/BuildCubeWithStream.java| 2 +- .../apache/kylin/rest/service/BasicService.java | 2 +- .../storage/hbase/util/StorageCleanupJob.java | 2 +- .../apache/kylin/tool/JobInstanceExtractor.java | 2 +- .../apache/kylin/tool/StorageCleanupJob.java| 2 +- 21 files changed, 419 insertions(+), 416 deletions(-) -- http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java index 111c1ba..a68f242 100644 --- a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java @@ -52,7 +52,7 @@ public class ShellExecutable extends AbstractExecutable { logger.info("executing:" + getCmd()); final ShellExecutableLogger logger = new ShellExecutableLogger(); final Pair result = context.getConfig().getCliCommandExecutor().execute(getCmd(), logger); -executableManager.addJobInfo(getId(), logger.getInfo()); +getManager().addJobInfo(getId(), logger.getInfo()); return new ExecuteResult(result.getFirst() == 0 ? ExecuteResult.State.SUCCEED : ExecuteResult.State.FAILED, result.getSecond()); } catch (IOException e) { logger.error("job:" + getId() + " execute finished with exception", e); http://git-wip-us.apache.org/repos/asf/kylin/blob/9615b4ea/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java -- diff --git a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java index 90e4d3c..f7b8a7c 100644 --- a/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/execution/AbstractExecutable.java @@ -32,7 +32,6 @@ import org.apache.kylin.common.util.MailService; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.exception.PersistentException; import org.apache.kylin.job.impl.threadpool.DefaultContext; -import org.apache.kylin.job.manager.ExecutableManager; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,43 +52,51 @@ public abstract class AbstractExecutable implements Executable, Idempotent { protected static final Logger logger = LoggerFactory.getLogger(AbstractExecutable.class); protected int retry = 0; +private KylinConfig config; private String name; private String id; private Map params = Maps.newHashMap(); -protected static ExecutableManager executableManager = ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv()); - public AbstractExecutable() { setId(UUID.randomUUID().toString()); } + +void initConfig(KylinConfig config) { +