http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java ---------------------------------------------------------------------- diff --cc core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java index 0000000,904c4bd..dd18c91 mode 000000,100644..100644 --- a/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java +++ b/core-job/src/main/java/org/apache/kylin/job/metrics/JobMetricsFacade.java @@@ -1,0 -1,142 +1,142 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.kylin.job.metrics; + + import org.apache.kylin.common.KylinConfig; + import org.apache.kylin.metrics.MetricsManager; + import org.apache.kylin.metrics.lib.impl.RecordEvent; + import org.apache.kylin.metrics.lib.impl.TimedRecordEvent; + import org.apache.kylin.metrics.property.JobPropertyEnum; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + public class JobMetricsFacade { + private static final Logger logger = LoggerFactory.getLogger(JobMetricsFacade.class); + + public static void updateMetrics(JobStatisticsResult jobStats) { + if (!KylinConfig.getInstanceFromEnv().isKylinMetricsReporterForJobEnabled()) { + return; + } + /** + * report job related metrics + */ + RecordEvent metricsEvent; + if (jobStats.throwable == null) { + metricsEvent = new TimedRecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJob()); + setJobWrapper(metricsEvent, jobStats.user, jobStats.projectName, jobStats.cubeName, jobStats.jobId, + jobStats.jobType, jobStats.cubingType); + setJobStats(metricsEvent, jobStats.tableSize, jobStats.cubeSize, jobStats.buildDuration, + jobStats.waitResourceTime, jobStats.perBytesTimeCost, // + jobStats.dColumnDistinct, jobStats.dDictBuilding, jobStats.dCubingInmem, jobStats.dHfileConvert); + } else { + metricsEvent = new TimedRecordEvent(KylinConfig.getInstanceFromEnv().getKylinMetricsSubjectJobException()); + setJobExceptionWrapper(metricsEvent, jobStats.user, jobStats.projectName, jobStats.cubeName, jobStats.jobId, + jobStats.jobType, jobStats.cubingType, // + jobStats.throwable.getClass()); + } + MetricsManager.getInstance().update(metricsEvent); + } + + private static void setJobWrapper(RecordEvent metricsEvent, String user, String projectName, String cubeName, + String jobId, String jobType, String cubingType) { + metricsEvent.put(JobPropertyEnum.USER.toString(), user); + metricsEvent.put(JobPropertyEnum.PROJECT.toString(), projectName); + metricsEvent.put(JobPropertyEnum.CUBE.toString(), cubeName); + metricsEvent.put(JobPropertyEnum.ID_CODE.toString(), jobId); + metricsEvent.put(JobPropertyEnum.TYPE.toString(), jobType); + metricsEvent.put(JobPropertyEnum.ALGORITHM.toString(), cubingType); + } + + private static void setJobStats(RecordEvent metricsEvent, long tableSize, long cubeSize, long buildDuration, + long waitResourceTime, double perBytesTimeCost, long dColumnDistinct, long dDictBuilding, long dCubingInmem, + long dHfileConvert) { + metricsEvent.put(JobPropertyEnum.SOURCE_SIZE.toString(), tableSize); + metricsEvent.put(JobPropertyEnum.CUBE_SIZE.toString(), cubeSize); + metricsEvent.put(JobPropertyEnum.BUILD_DURATION.toString(), buildDuration); + metricsEvent.put(JobPropertyEnum.WAIT_RESOURCE_TIME.toString(), waitResourceTime); + metricsEvent.put(JobPropertyEnum.PER_BYTES_TIME_COST.toString(), perBytesTimeCost); + metricsEvent.put(JobPropertyEnum.STEP_DURATION_DISTINCT_COLUMNS.toString(), dColumnDistinct); + metricsEvent.put(JobPropertyEnum.STEP_DURATION_DICTIONARY.toString(), dDictBuilding); + metricsEvent.put(JobPropertyEnum.STEP_DURATION_INMEM_CUBING.toString(), dCubingInmem); + metricsEvent.put(JobPropertyEnum.STEP_DURATION_HFILE_CONVERT.toString(), dHfileConvert); + } + + private static <T extends Throwable> void setJobExceptionWrapper(RecordEvent metricsEvent, String user, + String projectName, String cubeName, String jobId, String jobType, String cubingType, + Class<T> throwableClass) { + setJobWrapper(metricsEvent, user, projectName, cubeName, jobId, jobType, cubingType); + metricsEvent.put(JobPropertyEnum.EXCEPTION.toString(), throwableClass.getName()); + } + + public static class JobStatisticsResult { + // dimensions + private String user; + private String projectName; + private String cubeName; + private String jobId; + private String jobType; + private String cubingType; + + // statistics + private long tableSize; + private long cubeSize; + private long buildDuration; + private long waitResourceTime; + private double perBytesTimeCost; + + // step statistics + private long dColumnDistinct = 0L; + private long dDictBuilding = 0L; + private long dCubingInmem = 0L; + private long dHfileConvert = 0L; + + // exception + private Throwable throwable; + + public void setWrapper(String user, String projectName, String cubeName, String jobId, String jobType, + String cubingType) { + this.user = user; - this.projectName = projectName; ++ this.projectName = projectName == null ? null : projectName.toUpperCase(); + this.cubeName = cubeName; + this.jobId = jobId; + this.jobType = jobType; + this.cubingType = cubingType; + } + + public void setJobStats(long tableSize, long cubeSize, long buildDuration, long waitResourceTime, + double perBytesTimeCost) { + this.tableSize = tableSize; + this.cubeSize = cubeSize; + this.buildDuration = buildDuration; + this.waitResourceTime = waitResourceTime; + this.perBytesTimeCost = perBytesTimeCost; + } + + public void setJobStepStats(long dColumnDistinct, long dDictBuilding, long dCubingInmem, long dHfileConvert) { + this.dColumnDistinct = dColumnDistinct; + this.dDictBuilding = dDictBuilding; + this.dCubingInmem = dCubingInmem; + this.dHfileConvert = dHfileConvert; + } + + public void setJobException(Throwable throwable) { + this.throwable = throwable; + } + } + }
http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java ---------------------------------------------------------------------- diff --cc core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java index fd17370,0000000..ee29b13 mode 100644,000000..100644 --- a/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java +++ b/core-job/src/test/java/org/apache/kylin/job/FiveSecondSucceedTestExecutable.java @@@ -1,46 -1,0 +1,46 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job; + +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; + +/** + */ +public class FiveSecondSucceedTestExecutable extends BaseTestExecutable { + + public FiveSecondSucceedTestExecutable() { + super(); + } + + public FiveSecondSucceedTestExecutable(int sleepTime) { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + try { + Thread.sleep(5000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); ++ return ExecuteResult.createSucceed(); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/test/java/org/apache/kylin/job/RetryableTestExecutable.java ---------------------------------------------------------------------- diff --cc core-job/src/test/java/org/apache/kylin/job/RetryableTestExecutable.java index 0000000,61b1742..f656c44 mode 000000,100644..100644 --- a/core-job/src/test/java/org/apache/kylin/job/RetryableTestExecutable.java +++ b/core-job/src/test/java/org/apache/kylin/job/RetryableTestExecutable.java @@@ -1,0 -1,50 +1,50 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.kylin.job; + + import org.apache.kylin.common.KylinConfig; + import org.apache.kylin.job.execution.ExecutableContext; + import org.apache.kylin.job.execution.ExecuteResult; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + /** + */ + public class RetryableTestExecutable extends BaseTestExecutable { + private static final Logger logger = LoggerFactory.getLogger(RetryableTestExecutable.class); + + public RetryableTestExecutable() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) { + logger.debug("run retryable exception test. "); + String[] exceptions = KylinConfig.getInstanceFromEnv().getJobRetryExceptions(); + Throwable ex = null; - if (exceptions != null && exceptions[0] != null) { ++ if (exceptions != null && exceptions.length > 0) { + try { + ex = (Throwable) Class.forName(exceptions[0]).newInstance(); + } catch (Exception e) { + e.printStackTrace(); + } + } + return new ExecuteResult(ExecuteResult.State.ERROR, null, ex); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java ---------------------------------------------------------------------- diff --cc core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java index 89057e6,0000000..1826850 mode 100644,000000..100644 --- a/core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java +++ b/core-job/src/test/java/org/apache/kylin/job/RunningTestExecutable.java @@@ -1,39 -1,0 +1,39 @@@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job; + +import org.apache.kylin.job.exception.ExecuteException; +import org.apache.kylin.job.execution.ExecutableContext; +import org.apache.kylin.job.execution.ExecuteResult; + +public class RunningTestExecutable extends BaseTestExecutable { + + public RunningTestExecutable() { + super(); + } + + @Override + protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + } - return new ExecuteResult(ExecuteResult.State.SUCCEED, "succeed"); ++ return ExecuteResult.createSucceed(); + } +} http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java ---------------------------------------------------------------------- diff --cc core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java index b1fc544,badd483..d1b7d96 --- a/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java +++ b/core-job/src/test/java/org/apache/kylin/job/impl/threadpool/DefaultSchedulerTest.java @@@ -31,9 -30,7 +31,10 @@@ import org.apache.kylin.common.KylinCon import org.apache.kylin.job.BaseTestExecutable; import org.apache.kylin.job.ErrorTestExecutable; import org.apache.kylin.job.FailedTestExecutable; +import org.apache.kylin.job.FiveSecondSucceedTestExecutable; +import org.apache.kylin.job.NoErrorStatusExecutable; + import org.apache.kylin.job.RetryableTestExecutable; +import org.apache.kylin.job.RunningTestExecutable; import org.apache.kylin.job.SelfStopExecutable; import org.apache.kylin.job.SucceedTestExecutable; import org.apache.kylin.job.execution.DefaultChainedExecutable; @@@ -174,56 -148,18 +175,71 @@@ public class DefaultSchedulerTest exten } @Test + public void tesMetaStoreRecover() throws Exception { + logger.info("tesMetaStoreRecover"); + NoErrorStatusExecutable job = new NoErrorStatusExecutable(); + ErrorTestExecutable task = new ErrorTestExecutable(); + job.addTask(task); + execMgr.addJob(job); + Thread.sleep(2000); + runningJobToError(job.getId()); + Thread.sleep(2000); + Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(job.getId()).getState()); + } + + @Test + public void testSchedulerStop() throws Exception { + logger.info("testSchedulerStop"); + + thrown.expect(RuntimeException.class); + thrown.expectMessage("too long wait time"); + + DefaultChainedExecutable job = new DefaultChainedExecutable(); + BaseTestExecutable task1 = new FiveSecondSucceedTestExecutable(); + job.addTask(task1); + execMgr.addJob(job); + + //sleep 3s to make sure SucceedTestExecutable is running + Thread.sleep(3000); + //scheduler failed due to some reason + scheduler.shutdown(); + + waitForJobFinish(job.getId(), 6000); + } + + @Test + public void testSchedulerRestart() throws Exception { + logger.info("testSchedulerRestart"); + + DefaultChainedExecutable job = new DefaultChainedExecutable(); + BaseTestExecutable task1 = new FiveSecondSucceedTestExecutable(); + job.addTask(task1); + execMgr.addJob(job); + + //sleep 3s to make sure SucceedTestExecutable is running + Thread.sleep(3000); + //scheduler failed due to some reason + scheduler.shutdown(); + //restart + startScheduler(); + + waitForJobFinish(job.getId(), 10000); + Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(job.getId()).getState()); + Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState()); + } ++ + public void testRetryableException() throws Exception { + System.setProperty("kylin.job.retry-exception-classes", "java.io.FileNotFoundException"); + System.setProperty("kylin.job.retry", "3"); + DefaultChainedExecutable job = new DefaultChainedExecutable(); + BaseTestExecutable task1 = new SucceedTestExecutable(); + BaseTestExecutable task2 = new RetryableTestExecutable(); + job.addTask(task1); + job.addTask(task2); - jobService.addJob(job); - waitForJobFinish(job.getId()); - Assert.assertEquals(ExecutableState.SUCCEED, jobService.getOutput(task1.getId()).getState()); - Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(task2.getId()).getState()); - Assert.assertEquals(ExecutableState.ERROR, jobService.getOutput(job.getId()).getState()); ++ execMgr.addJob(job); ++ waitForJobFinish(job.getId(), 10000); ++ Assert.assertEquals(ExecutableState.SUCCEED, execMgr.getOutput(task1.getId()).getState()); ++ Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(task2.getId()).getState()); ++ Assert.assertEquals(ExecutableState.ERROR, execMgr.getOutput(job.getId()).getState()); + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java ---------------------------------------------------------------------- diff --cc core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java index efbc33e,ab55563..f09c47c --- a/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/TableMetadataManager.java @@@ -55,19 -58,51 +55,21 @@@ import com.google.common.collect.Maps */ public class TableMetadataManager { + @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(TableMetadataManager.class); + public static final Serializer<TableDesc> TABLE_SERIALIZER = new JsonSerializer<TableDesc>(TableDesc.class); - public static final Serializer<TableExtDesc> TABLE_EXT_SERIALIZER = new JsonSerializer<TableExtDesc>( ++ + private static final Serializer<TableExtDesc> TABLE_EXT_SERIALIZER = new JsonSerializer<TableExtDesc>( TableExtDesc.class); - public static final Serializer<ExternalFilterDesc> EXTERNAL_FILTER_DESC_SERIALIZER = new JsonSerializer<ExternalFilterDesc>( - ExternalFilterDesc.class); - - // static cached instances - private static final ConcurrentMap<KylinConfig, TableMetadataManager> CACHE = new ConcurrentHashMap<KylinConfig, TableMetadataManager>(); public static TableMetadataManager getInstance(KylinConfig config) { - TableMetadataManager r = CACHE.get(config); - if (r != null) { - return r; - } - - synchronized (TableMetadataManager.class) { - r = CACHE.get(config); - if (r != null) { - return r; - } - try { - r = new TableMetadataManager(config); - CACHE.put(config, r); - if (CACHE.size() > 1) { - logger.warn("More than one singleton exist, current keys: {}", StringUtils - .join(Iterators.transform(CACHE.keySet().iterator(), new Function<KylinConfig, String>() { - @Nullable - @Override - public String apply(@Nullable KylinConfig input) { - return String.valueOf(System.identityHashCode(input)); - } - }), ",")); - } - - return r; - } catch (IOException e) { - throw new IllegalStateException("Failed to init TableMetadataManager from " + config, e); - } - } + return config.getManager(TableMetadataManager.class); } - public static void clearCache() { - CACHE.clear(); + // called by reflection + static TableMetadataManager newInstance(KylinConfig config) throws IOException { + return new TableMetadataManager(config); } // ============================================================================ http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-metadata/src/main/java/org/apache/kylin/metadata/datatype/DataTypeSerializer.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-metadata/src/main/java/org/apache/kylin/metadata/model/DataModelDesc.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java ---------------------------------------------------------------------- diff --cc core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java index fa97596,d8b33c0..7597d40 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java @@@ -154,20 -146,12 +154,19 @@@ public class FunctionDesc implements Se } public DataType getRewriteFieldType() { - - if (isMax() || isMin()) - return parameter.getColRefs().get(0).getType(); - else if (getMeasureType() instanceof BasicMeasureType) - return returnDataType; - else + if (getMeasureType() instanceof BasicMeasureType) { + if (isMax() || isMin()) { + return parameter.getColRefs().get(0).getType(); + } else if (isSum()) { + return parameter.getColRefs().get(0).getType(); + } else if (isCount()) { + return DataType.getType("bigint"); + } else { + throw new IllegalArgumentException("unknown measure type " + getMeasureType()); + } + } else { return DataType.ANY; + } } public ColumnDesc newFakeRewriteColumn(TableDesc sourceTable) { @@@ -247,10 -231,18 +246,18 @@@ return expression; } ++ public void setExpression(String expression) { ++ this.expression = expression; ++ } ++ public ParameterDesc getParameter() { return parameter; } + public void setParameter(ParameterDesc parameter) { + this.parameter = parameter; + } + - public void setExpression(String expression) { - this.expression = expression; - } - public int getParameterCount() { int count = 0; for (ParameterDesc p = parameter; p != null; p = p.getNextParameter()) { http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-metadata/src/main/java/org/apache/kylin/metadata/model/PartitionDesc.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-metadata/src/main/java/org/apache/kylin/metadata/model/Segments.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java ---------------------------------------------------------------------- diff --cc core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java index 0029de2,a7d37e7..9b7aaf2 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/project/ProjectInstance.java @@@ -50,8 -50,7 +50,33 @@@ import com.google.common.collect.Lists public class ProjectInstance extends RootPersistentEntity { public static final String DEFAULT_PROJECT_NAME = "default"; + ++ public static ProjectInstance create(String name, String owner, String description, LinkedHashMap<String, String> overrideProps, List<RealizationEntry> realizationEntries, List<String> models) { ++ ProjectInstance projectInstance = new ProjectInstance(); ++ ++ projectInstance.updateRandomUuid(); ++ projectInstance.setName(name); ++ projectInstance.setOwner(owner); ++ projectInstance.setDescription(description); ++ projectInstance.setStatus(ProjectStatusEnum.ENABLED); ++ projectInstance.setCreateTimeUTC(System.currentTimeMillis()); ++ projectInstance.setOverrideKylinProps(overrideProps); ++ ++ if (realizationEntries != null) ++ projectInstance.setRealizationEntries(realizationEntries); ++ else ++ projectInstance.setRealizationEntries(Lists.<RealizationEntry> newArrayList()); ++ if (models != null) ++ projectInstance.setModels(models); ++ else ++ projectInstance.setModels(new ArrayList<String>()); ++ return projectInstance; ++ } ++ ++ // ============================================================================ ++ + private KylinConfigExt config; + @JsonProperty("name") private String name; @@@ -87,50 -86,53 +112,44 @@@ @JsonInclude(JsonInclude.Include.NON_NULL) private LinkedHashMap<String, String> overrideKylinProps; - public String getResourcePath() { - return concatResourcePath(resourceName()); - } - private KylinConfigExt config; ++ public void init() { ++ if (name == null) ++ name = ProjectInstance.DEFAULT_PROJECT_NAME; - public static String concatResourcePath(String projectName) { - return ResourceStore.PROJECT_RESOURCE_ROOT + "/" + projectName + ".json"; - public String getResourcePath() { - return concatResourcePath(name); -- } ++ if (realizationEntries == null) { ++ realizationEntries = new ArrayList<RealizationEntry>(); ++ } - public static ProjectInstance create(String name, String owner, String description, LinkedHashMap<String, String> overrideProps, List<RealizationEntry> realizationEntries, List<String> models) { - ProjectInstance projectInstance = new ProjectInstance(); - public static String concatResourcePath(String projectName) { - return ResourceStore.PROJECT_RESOURCE_ROOT + "/" + projectName + ".json"; - } ++ if (tables == null) ++ tables = new TreeSet<String>(); - projectInstance.updateRandomUuid(); - projectInstance.setName(name); - projectInstance.setOwner(owner); - projectInstance.setDescription(description); - projectInstance.setStatus(ProjectStatusEnum.ENABLED); - projectInstance.setCreateTimeUTC(System.currentTimeMillis()); - projectInstance.setOverrideKylinProps(overrideProps); - public static String getNormalizedProjectName(String project) { - if (project == null) - throw new IllegalStateException("Trying to normalized a project name which is null"); ++ if (overrideKylinProps == null) { ++ overrideKylinProps = new LinkedHashMap<>(); ++ } - if (realizationEntries != null) - projectInstance.setRealizationEntries(realizationEntries); - else - projectInstance.setRealizationEntries(Lists.<RealizationEntry> newArrayList()); - if (models != null) - projectInstance.setModels(models); - else - projectInstance.setModels(new ArrayList<String>()); - return projectInstance; - return project.toUpperCase(); - } ++ initConfig(); + - public static ProjectInstance create(String name, String owner, String description, LinkedHashMap<String, String> overrideProps, List<RealizationEntry> realizationEntries, List<String> models) { - ProjectInstance projectInstance = new ProjectInstance(); ++ if (StringUtils.isBlank(this.name)) ++ throw new IllegalStateException("Project name must not be blank"); + } - public void initConfig() { - projectInstance.updateRandomUuid(); - projectInstance.setName(name); - projectInstance.setOwner(owner); - projectInstance.setDescription(description); - projectInstance.setStatus(ProjectStatusEnum.ENABLED); - projectInstance.setCreateTimeUTC(System.currentTimeMillis()); - if (overrideProps != null) { - projectInstance.setOverrideKylinProps(overrideProps); - } else { - projectInstance.setOverrideKylinProps(new LinkedHashMap<String, String>()); - } - if (realizationEntries != null) - projectInstance.setRealizationEntries(realizationEntries); - else - projectInstance.setRealizationEntries(Lists.<RealizationEntry> newArrayList()); - if (models != null) - projectInstance.setModels(models); - else - projectInstance.setModels(new ArrayList<String>()); - return projectInstance; ++ private void initConfig() { + this.config = KylinConfigExt.createInstance(KylinConfig.getInstanceFromEnv(), this.overrideKylinProps); } -- // ============================================================================ ++ public String getResourcePath() { ++ return concatResourcePath(resourceName()); ++ } -- public ProjectInstance() { ++ public static String concatResourcePath(String projectName) { ++ return ResourceStore.PROJECT_RESOURCE_ROOT + "/" + projectName + ".json"; } + @Override + public String resourceName() { + return this.name; + } + public String getDescription() { return description; } @@@ -310,31 -333,6 +329,10 @@@ return config; } + public void setConfig(KylinConfigExt config) { + this.config = config; + } + - public void init() { - if (name == null) - name = ProjectInstance.DEFAULT_PROJECT_NAME; - - if (realizationEntries == null) { - realizationEntries = new ArrayList<RealizationEntry>(); - } - - if (tables == null) - tables = new TreeSet<String>(); - - if (overrideKylinProps == null) { - overrideKylinProps = new LinkedHashMap<>(); - } - - initConfig(); - - if (StringUtils.isBlank(this.name)) - throw new IllegalStateException("Project name must not be blank"); - } - @Override public String toString() { return "ProjectDesc [name=" + name + "]"; http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/CubeScanRangePlanner.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java ---------------------------------------------------------------------- diff --cc core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java index b910ffe,024990f..11ad8bb --- a/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java +++ b/core-storage/src/main/java/org/apache/kylin/storage/gtrecord/GTCubeStorageQueryBase.java @@@ -276,9 -255,44 +281,25 @@@ public abstract class GTCubeStorageQuer } } } - - // expand derived - Set<TblColRef> resultD = Sets.newHashSet(); - for (TblColRef col : result) { - if (cubeDesc.isExtendedColumn(col)) { - throw new CubeDesc.CannotFilterExtendedColumnException(col); - } - if (cubeDesc.isDerived(col)) { - DeriveInfo hostInfo = cubeDesc.getHostInfo(col); - if (hostInfo.isOneToOne) { - for (TblColRef hostCol : hostInfo.columns) { - resultD.add(hostCol); - } - } - //if not one2one, it will be pruned - } else { - resultD.add(col); - } - } - return resultD; + return result; } + private long getQueryFilterMask(Set<TblColRef> filterColumnD) { + long filterMask = 0; + + logger.info("Filter column set for query: " + filterColumnD.toString()); + if (filterColumnD.size() != 0) { + RowKeyColDesc[] allColumns = cubeDesc.getRowkey().getRowKeyColumns(); + for (int i = 0; i < allColumns.length; i++) { + if (filterColumnD.contains(allColumns[i].getColRef())) { + filterMask |= 1L << allColumns[i].getBitIndex(); + } + } + } + logger.info("Filter mask is: " + filterMask); + return filterMask; + } + public boolean isNeedStorageAggregation(Cuboid cuboid, Collection<TblColRef> groupD, Collection<TblColRef> singleValueD) { HashSet<TblColRef> temp = Sets.newHashSet(); http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index 8e21f5c,8fbc0c9..faac724 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@@ -18,7 -18,9 +18,10 @@@ package org.apache.kylin.engine.mr; -import org.apache.kylin.cube.CubeManager; ++import java.util.List; ++ import org.apache.kylin.cube.CubeSegment; + import org.apache.kylin.cube.cuboid.CuboidUtil; import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide; import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; @@@ -57,7 -62,12 +61,12 @@@ public class BatchCubingJobBuilder2 ext inputSide.addStepPhase1_CreateFlatTable(result); // Phase 2: Build Dictionary - result.addTask(createFactDistinctColumnsStepWithStats(jobId)); + result.addTask(createFactDistinctColumnsStep(jobId)); + + if (isEnableUHCDictStep()) { + result.addTask(createBuildUHCDictStep(jobId)); + } + result.addTask(createBuildDictionaryStep(jobId)); result.addTask(createSaveStatisticsStep(jobId)); outputSide.addStepPhase2_BuildDictionary(result); @@@ -75,8 -85,22 +84,22 @@@ return result; } + private boolean isEnableUHCDictStep() { + if (!config.getConfig().isBuildUHCDictWithMREnabled()) { + return false; + } + - List<TblColRef> uhcColumns = CubeManager.getInstance(config.getConfig()).getAllUHCColumns(seg.getCubeDesc()); ++ List<TblColRef> uhcColumns = seg.getCubeDesc().getAllUHCColumns(); + if (uhcColumns.size() == 0) { + return false; + } + + return true; + } + protected void addLayerCubingSteps(final CubingJob result, final String jobId, final String cuboidRootPath) { - final int maxLevel = seg.getCuboidScheduler().getBuildLevel(); + // Don't know statistics so that tree cuboid scheduler is not determined. Determine the maxLevel at runtime + final int maxLevel = CuboidUtil.getLongestDepth(seg.getCuboidScheduler().getAllCuboidIds()); // base cuboid step result.addTask(createBaseCuboidStep(getCuboidOutputPathsByLevel(cuboidRootPath, 0), jobId)); // n dim cuboid steps http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java index 6393abf,abf7e02..6f26c35 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/CubingJob.java @@@ -99,9 -142,11 +142,11 @@@ public class CubingJob extends DefaultC format.setTimeZone(TimeZone.getTimeZone(config.getTimeZone())); result.setDeployEnvName(kylinConfig.getDeployEnv()); result.setProjectName(projList.get(0).getName()); + result.setJobType(jobType); CubingExecutableUtil.setCubeName(seg.getCubeInstance().getName(), result.getParams()); CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams()); + CubingExecutableUtil.setSegmentName(seg.getName(), result.getParams()); - result.setName(jobType + " CUBE - " + seg.getCubeInstance().getName() + " - " + seg.getName() + " - " + result.setName(jobType + " CUBE - " + seg.getCubeInstance().getDisplayName() + " - " + seg.getName() + " - " + format.format(new Date(System.currentTimeMillis()))); result.setSubmitter(submitter); result.setNotifyList(seg.getCubeInstance().getDescriptor().getNotifyList()); @@@ -197,6 -259,44 +259,44 @@@ super.onExecuteFinished(result, executableContext); } + protected void onStatusChange(ExecutableContext context, ExecuteResult result, ExecutableState state) { + super.onStatusChange(context, result, state); + + updateMetrics(context, result, state); + } + + protected void updateMetrics(ExecutableContext context, ExecuteResult result, ExecutableState state) { + JobMetricsFacade.JobStatisticsResult jobStats = new JobMetricsFacade.JobStatisticsResult(); - jobStats.setWrapper(getSubmitter(), ProjectInstance.getNormalizedProjectName(getProjectName()), ++ jobStats.setWrapper(getSubmitter(), getProjectName(), + CubingExecutableUtil.getCubeName(getParams()), getId(), getJobType(), + getAlgorithm() == null ? "NULL" : getAlgorithm().toString()); + + if (state == ExecutableState.SUCCEED) { + jobStats.setJobStats(findSourceSizeBytes(), findCubeSizeBytes(), getDuration(), getMapReduceWaitTime(), + getPerBytesTimeCost(findSourceSizeBytes(), getDuration())); + if (CubingJobTypeEnum.getByName(getJobType()) == CubingJobTypeEnum.BUILD) { + jobStats.setJobStepStats( + getTaskByName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS).getDuration(), + getTaskByName(ExecutableConstants.STEP_NAME_BUILD_DICTIONARY).getDuration(), + getTaskByName(ExecutableConstants.STEP_NAME_BUILD_IN_MEM_CUBE).getDuration(), + getTaskByName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE).getDuration()); + } + } else if (state == ExecutableState.ERROR) { + jobStats.setJobException(result.getThrowable() != null ? result.getThrowable() : new Exception()); + } + JobMetricsFacade.updateMetrics(jobStats); + } + + private static double getPerBytesTimeCost(long size, long timeCost) { + if (size <= 0) { + return 0; + } + if (size < MIN_SOURCE_SIZE) { + size = MIN_SOURCE_SIZE; + } + return timeCost * 1.0 / size; + } + /** * build fail because the metadata store has problem. * @param exception http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 2c0f9f6,ade07e9..8925a8e --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@@ -462,7 -468,9 +471,12 @@@ public abstract class AbstractHadoopJo } } + public static KylinConfig loadKylinConfigFromHdfs(SerializableConfiguration conf, String uri) { + HadoopUtil.setCurrentConfiguration(conf.get()); ++ return loadKylinConfigFromHdfs(uri); ++ } + + public static KylinConfig loadKylinConfigFromHdfs(String uri) { if (uri == null) throw new IllegalArgumentException("meta url should not be null"); http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index 52b6af5,50c589a..64163ad --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@@ -66,6 -68,9 +68,9 @@@ public interface BatchConstants String CFG_OUTPUT_PARTITION = "partition"; String CFG_MR_SPARK_JOB = "mr.spark.job"; String CFG_SPARK_META_URL = "spark.meta.url"; + String CFG_GLOBAL_DICT_BASE_DIR = "global.dict.base.dir"; + - String CFG_HLL_SHARD_BASE = "mapreduce.partition.hll.shard.base"; ++ String CFG_HLL_REDUCER_NUM = "cuboidHLLCounterReducerNum"; /** * command line ARGuments http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index fcd3162,8b9b928..3c054a3 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@@ -40,6 -40,6 +40,7 @@@ import org.apache.hadoop.io.SequenceFil import org.apache.hadoop.io.SequenceFile.Reader.Option; import org.apache.hadoop.util.ReflectionUtils; import org.apache.kylin.common.KylinConfig; ++import org.apache.kylin.common.persistence.RawResource; import org.apache.kylin.common.persistence.ResourceStore; import org.apache.kylin.common.util.ByteArray; import org.apache.kylin.common.util.Bytes; @@@ -80,51 -80,50 +81,54 @@@ public class CubeStatsReader final CuboidScheduler cuboidScheduler; public CubeStatsReader(CubeSegment cubeSegment, KylinConfig kylinConfig) throws IOException { + this(cubeSegment, cubeSegment.getCuboidScheduler(), kylinConfig); + } + + /** + * @param cuboidScheduler if it's null, part of it's functions will not be supported + */ + public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, KylinConfig kylinConfig) + throws IOException { ResourceStore store = ResourceStore.getStore(kylinConfig); - cuboidScheduler = cubeSegment.getCuboidScheduler(); String statsKey = cubeSegment.getStatisticsResourcePath(); -- File tmpSeqFile = writeTmpSeqFile(store.getResource(statsKey).inputStream); - Reader reader = null; - - try { - Configuration hadoopConf = HadoopUtil.getCurrentConfiguration(); - - Path path = new Path(HadoopUtil.fixWindowsPath("file://" + tmpSeqFile.getAbsolutePath())); - Option seqInput = SequenceFile.Reader.file(path); - reader = new SequenceFile.Reader(hadoopConf, seqInput); - - int percentage = 100; - int mapperNumber = 0; - double mapperOverlapRatio = 0; - Map<Long, HLLCounter> counterMap = Maps.newHashMap(); - - LongWritable key = (LongWritable) ReflectionUtils.newInstance(reader.getKeyClass(), hadoopConf); - BytesWritable value = (BytesWritable) ReflectionUtils.newInstance(reader.getValueClass(), hadoopConf); - while (reader.next(key, value)) { - if (key.get() == 0L) { - percentage = Bytes.toInt(value.getBytes()); - } else if (key.get() == -1) { - mapperOverlapRatio = Bytes.toDouble(value.getBytes()); - } else if (key.get() == -2) { - mapperNumber = Bytes.toInt(value.getBytes()); - } else if (key.get() > 0) { - HLLCounter hll = new HLLCounter(kylinConfig.getCubeStatsHLLPrecision()); - ByteArray byteArray = new ByteArray(value.getBytes()); - hll.readRegisters(byteArray.asBuffer()); - counterMap.put(key.get(), hll); - } - } - - this.seg = cubeSegment; - this.samplingPercentage = percentage; - this.mapperNumberOfFirstBuild = mapperNumber; - this.mapperOverlapRatioOfFirstBuild = mapperOverlapRatio; - this.cuboidRowEstimatesHLL = counterMap; ++ RawResource resource = store.getResource(statsKey); ++ if (resource == null) ++ throw new IllegalStateException("Missing resource at " + statsKey); ++ ++ File tmpSeqFile = writeTmpSeqFile(resource.inputStream); + Path path = new Path(HadoopUtil.fixWindowsPath("file://" + tmpSeqFile.getAbsolutePath())); + + CubeStatsResult cubeStatsResult = new CubeStatsResult(path, kylinConfig.getCubeStatsHLLPrecision()); + tmpSeqFile.delete(); + + this.seg = cubeSegment; + this.cuboidScheduler = cuboidScheduler; + this.samplingPercentage = cubeStatsResult.getPercentage(); + this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber(); + this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio(); + this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap(); + } - } finally { - IOUtils.closeStream(reader); - tmpSeqFile.delete(); - } + /** + * Read statistics from + * @param path + * rather than + * @param cubeSegment + * + * Since the statistics are from + * @param path + * cuboid scheduler should be provided by default + */ + public CubeStatsReader(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, KylinConfig kylinConfig, Path path) + throws IOException { + CubeStatsResult cubeStatsResult = new CubeStatsResult(path, kylinConfig.getCubeStatsHLLPrecision()); + + this.seg = cubeSegment; + this.cuboidScheduler = cuboidScheduler; + this.samplingPercentage = cubeStatsResult.getPercentage(); + this.mapperNumberOfFirstBuild = cubeStatsResult.getMapperNumber(); + this.mapperOverlapRatioOfFirstBuild = cubeStatsResult.getMapperOverlapRatio(); + this.cuboidRowEstimatesHLL = cubeStatsResult.getCounterMap(); } private File writeTmpSeqFile(InputStream inputStream) throws IOException { http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java index 0000000,1809ff0..0e56287 mode 000000,100644..100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CuboidSchedulerUtil.java @@@ -1,0 -1,54 +1,60 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.kylin.engine.mr.common; + + import java.io.IOException; + import java.util.Comparator; + import java.util.Map; + import java.util.Set; + + import org.apache.kylin.cube.CubeSegment; + import org.apache.kylin.cube.cuboid.Cuboid; + import org.apache.kylin.cube.cuboid.CuboidModeEnum; + import org.apache.kylin.cube.cuboid.CuboidScheduler; + import org.apache.kylin.cube.cuboid.TreeCuboidScheduler; + + import com.google.common.collect.Lists; + + public class CuboidSchedulerUtil { + + public static CuboidScheduler getCuboidSchedulerByMode(CubeSegment segment, String cuboidModeName) { - return getCuboidSchedulerByMode(segment, CuboidModeEnum.getByModeName(cuboidModeName)); ++ if (cuboidModeName == null) ++ return segment.getCuboidScheduler(); ++ else ++ return getCuboidSchedulerByMode(segment, CuboidModeEnum.getByModeName(cuboidModeName)); + } + + public static CuboidScheduler getCuboidSchedulerByMode(CubeSegment segment, CuboidModeEnum cuboidMode) { - return getCuboidScheduler(segment, segment.getCubeInstance().getCuboidsByMode(cuboidMode)); ++ if (cuboidMode == CuboidModeEnum.CURRENT || cuboidMode == null) ++ return segment.getCuboidScheduler(); ++ else ++ return getCuboidScheduler(segment, segment.getCubeInstance().getCuboidsByMode(cuboidMode)); + } + + public static CuboidScheduler getCuboidScheduler(CubeSegment segment, Set<Long> cuboidSet) { + try { + Map<Long, Long> cuboidsWithRowCnt = CuboidStatsReaderUtil.readCuboidStatsFromSegment(cuboidSet, segment); + Comparator<Long> comparator = cuboidsWithRowCnt == null ? Cuboid.cuboidSelectComparator + : new TreeCuboidScheduler.CuboidCostComparator(cuboidsWithRowCnt); + return new TreeCuboidScheduler(segment.getCubeDesc(), Lists.newArrayList(cuboidSet), comparator); + } catch (IOException e) { + throw new RuntimeException("Fail to cube stats for segment" + segment + " due to " + e); + } + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java index ad13425,a230517..ad8b954 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/JobInfoConverter.java @@@ -59,23 -67,13 +70,17 @@@ public class JobInfoConverter return null; } + CubingJob cubeJob = (CubingJob) job; - + CubeInstance cube = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()) + .getCube(CubingExecutableUtil.getCubeName(cubeJob.getParams())); + - Output output = outputs.get(job.getId()); final JobInstance result = new JobInstance(); result.setName(job.getName()); - if (cube != null) { - result.setRelatedCube(cube.getDisplayName()); - } else { - result.setRelatedCube(CubingExecutableUtil.getCubeName(cubeJob.getParams())); - } - result.setRelatedCube(CubingExecutableUtil.getCubeName(job.getParams())); - result.setRelatedSegment(CubingExecutableUtil.getSegmentId(job.getParams())); ++ result.setRelatedCube(cube != null ? cube.getDisplayName() : CubingExecutableUtil.getCubeName(cubeJob.getParams())); + result.setRelatedSegment(CubingExecutableUtil.getSegmentId(cubeJob.getParams())); result.setLastModified(output.getLastModified()); - result.setSubmitter(cubeJob.getSubmitter()); - result.setUuid(cubeJob.getId()); + result.setSubmitter(job.getSubmitter()); + result.setUuid(job.getId()); result.setType(CubeBuildTypeEnum.BUILD); result.setStatus(parseToJobStatus(output.getState())); result.setMrWaiting(AbstractExecutable.getExtraInfoAsLong(output, CubingJob.MAP_REDUCE_WAIT_TIME, 0L) / 1000); http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java index 0000000,b249f12..8fc26b4 mode 000000,100644..100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/MapReduceUtil.java @@@ -1,0 -1,131 +1,132 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.kylin.engine.mr.common; + + import java.io.IOException; + import java.util.Map; + + import org.apache.hadoop.mapreduce.Reducer; + import org.apache.kylin.common.KylinConfig; ++import org.apache.kylin.cube.CubeInstance; + import org.apache.kylin.cube.CubeSegment; + import org.apache.kylin.cube.cuboid.CuboidScheduler; + import org.apache.kylin.cube.model.CubeDesc; + import org.apache.kylin.job.exception.JobException; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + public class MapReduceUtil { + + private static final Logger logger = LoggerFactory.getLogger(MapReduceUtil.class); + + /** + * @return reducer number for calculating hll + */ - public static int getHLLShardBase(CubeSegment segment) { - int nCuboids = segment.getCuboidScheduler().getAllCuboidIds().size(); - int shardBase = (nCuboids - 1) / segment.getConfig().getHadoopJobPerReducerHLLCuboidNumber() + 1; ++ public static int getCuboidHLLCounterReducerNum(CubeInstance cube) { ++ int nCuboids = cube.getCuboidScheduler().getAllCuboidIds().size(); ++ int shardBase = (nCuboids - 1) / cube.getConfig().getHadoopJobPerReducerHLLCuboidNumber() + 1; + - int hllMaxReducerNumber = segment.getConfig().getHadoopJobHLLMaxReducerNumber(); ++ int hllMaxReducerNumber = cube.getConfig().getHadoopJobHLLMaxReducerNumber(); + if (shardBase > hllMaxReducerNumber) { + shardBase = hllMaxReducerNumber; + } + return shardBase; + } + + /** + * @param cuboidScheduler specified can provide more flexibility + * */ + public static int getLayeredCubingReduceTaskNum(CubeSegment cubeSegment, CuboidScheduler cuboidScheduler, + double totalMapInputMB, int level) + throws ClassNotFoundException, IOException, InterruptedException, JobException { + CubeDesc cubeDesc = cubeSegment.getCubeDesc(); + KylinConfig kylinConfig = cubeDesc.getConfig(); + + double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB(); + double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio(); + logger.info("Having per reduce MB " + perReduceInputMB + ", reduce count ratio " + reduceCountRatio + ", level " + + level); + + CubeStatsReader cubeStatsReader = new CubeStatsReader(cubeSegment, cuboidScheduler, kylinConfig); + + double parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst; + + if (level == -1) { + //merge case + double estimatedSize = cubeStatsReader.estimateCubeSize(); + adjustedCurrentLayerSizeEst = estimatedSize > totalMapInputMB ? totalMapInputMB : estimatedSize; + logger.debug("estimated size {}, input size {}, adjustedCurrentLayerSizeEst: {}", estimatedSize, + totalMapInputMB, adjustedCurrentLayerSizeEst); + } else if (level == 0) { + //base cuboid case TODO: the estimation could be very WRONG because it has no correction + adjustedCurrentLayerSizeEst = cubeStatsReader.estimateLayerSize(0); + logger.debug("adjustedCurrentLayerSizeEst: {}", adjustedCurrentLayerSizeEst); + } else { + parentLayerSizeEst = cubeStatsReader.estimateLayerSize(level - 1); + currentLayerSizeEst = cubeStatsReader.estimateLayerSize(level); + adjustedCurrentLayerSizeEst = totalMapInputMB / parentLayerSizeEst * currentLayerSizeEst; + logger.debug( + "totalMapInputMB: {}, parentLayerSizeEst: {}, currentLayerSizeEst: {}, adjustedCurrentLayerSizeEst: {}", + totalMapInputMB, parentLayerSizeEst, currentLayerSizeEst, adjustedCurrentLayerSizeEst); + } + + // number of reduce tasks + int numReduceTasks = (int) Math.round(adjustedCurrentLayerSizeEst / perReduceInputMB * reduceCountRatio + 0.99); + + // adjust reducer number for cube which has DISTINCT_COUNT measures for better performance + if (cubeDesc.hasMemoryHungryMeasures()) { + logger.debug("Multiply reducer num by 4 to boost performance for memory hungry measures"); + numReduceTasks = numReduceTasks * 4; + } + + // at least 1 reducer by default + numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks); + // no more than 500 reducer by default + numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks); + + return numReduceTasks; + } + + public static int getInmemCubingReduceTaskNum(CubeSegment cubeSeg, CuboidScheduler cuboidScheduler) + throws IOException { + KylinConfig kylinConfig = cubeSeg.getConfig(); + + Map<Long, Double> cubeSizeMap = new CubeStatsReader(cubeSeg, cuboidScheduler, kylinConfig).getCuboidSizeMap(); + double totalSizeInM = 0; + for (Double cuboidSize : cubeSizeMap.values()) { + totalSizeInM += cuboidSize; + } + + double perReduceInputMB = kylinConfig.getDefaultHadoopJobReducerInputMB(); + double reduceCountRatio = kylinConfig.getDefaultHadoopJobReducerCountRatio(); + + // number of reduce tasks + int numReduceTasks = (int) Math.round(totalSizeInM / perReduceInputMB * reduceCountRatio); + + // at least 1 reducer by default + numReduceTasks = Math.max(kylinConfig.getHadoopJobMinReducerNumber(), numReduceTasks); + // no more than 500 reducer by default + numReduceTasks = Math.min(kylinConfig.getHadoopJobMaxReducerNumber(), numReduceTasks); + + logger.info("Having total map input MB " + Math.round(totalSizeInM)); + logger.info("Having per reduce MB " + perReduceInputMB); + logger.info("Setting " + Reducer.Context.NUM_REDUCES + "=" + numReduceTasks); + return numReduceTasks; + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java index 0f7281f,4efcb96..09db8e9 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/StatisticsDecisionUtil.java @@@ -107,8 -101,30 +101,30 @@@ public class StatisticsDecisionUtil return; } + CubeInstance cube = segment.getCubeInstance(); - CubeUpdate cubeBuilder = new CubeUpdate(cube); - cubeBuilder.setCuboids(recommendCuboidsWithStats); - CubeManager.getInstance(cube.getConfig()).updateCube(cubeBuilder); + CubeUpdate update = new CubeUpdate(cube.latestCopyForWrite()); + update.setCuboids(recommendCuboidsWithStats); + CubeManager.getInstance(cube.getConfig()).updateCube(update); } + + public static boolean isAbleToOptimizeCubingPlan(CubeSegment segment) { + CubeInstance cube = segment.getCubeInstance(); + if (!cube.getConfig().isCubePlannerEnabled()) + return false; + + if (cube.getSegments(SegmentStatusEnum.READY_PENDING).size() > 0) { + logger.info("Has read pending segments and will not enable cube planner."); + return false; + } + List<CubeSegment> readySegments = cube.getSegments(SegmentStatusEnum.READY); + List<CubeSegment> newSegments = cube.getSegments(SegmentStatusEnum.NEW); + if (newSegments.size() <= 1 && // + (readySegments.size() == 0 || // + (cube.getConfig().isCubePlannerEnabledForExistingCube() && readySegments.size() == 1 + && readySegments.get(0).getSegRange().equals(segment.getSegRange())))) { + return true; + } else { + return false; + } + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java index 0000000,8f64272..f3bdabd mode 000000,100644..100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidJob.java @@@ -1,0 -1,120 +1,120 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.kylin.engine.mr.steps; + + import java.io.IOException; + + import org.apache.commons.cli.Options; + import org.apache.hadoop.fs.Path; + import org.apache.hadoop.io.NullWritable; + import org.apache.hadoop.io.Text; + import org.apache.hadoop.mapreduce.Job; + import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat; + import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; + import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat; + import org.apache.kylin.common.KylinConfig; + import org.apache.kylin.cube.CubeInstance; + import org.apache.kylin.cube.CubeManager; + import org.apache.kylin.cube.CubeSegment; + import org.apache.kylin.engine.mr.common.AbstractHadoopJob; + import org.apache.kylin.engine.mr.common.BatchConstants; + import org.apache.kylin.engine.mr.common.MapReduceUtil; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + public class CalculateStatsFromBaseCuboidJob extends AbstractHadoopJob { + + private static final Logger logger = LoggerFactory.getLogger(CalculateStatsFromBaseCuboidJob.class); + + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + + try { + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_SEGMENT_ID); + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_STATISTICS_SAMPLING_PERCENT); + options.addOption(OPTION_CUBOID_MODE); + parseOptions(options, args); + + job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + String cubeName = getOptionValue(OPTION_CUBE_NAME); + String segmentID = getOptionValue(OPTION_SEGMENT_ID); + Path input = new Path(getOptionValue(OPTION_INPUT_PATH)); + Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH)); + String statistics_sampling_percent = getOptionValue(OPTION_STATISTICS_SAMPLING_PERCENT); + String cuboidMode = getOptionValue(OPTION_CUBOID_MODE); + + CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); + CubeInstance cube = cubeMgr.getCube(cubeName); + CubeSegment cubeSegment = cube.getSegmentById(segmentID); + + job.getConfiguration().set(BatchConstants.CFG_CUBOID_MODE, cuboidMode); + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); + job.getConfiguration().set(BatchConstants.CFG_STATISTICS_SAMPLING_PERCENT, statistics_sampling_percent); + logger.info("Starting: " + job.getJobName()); + + setJobClasspath(job, cube.getConfig()); + + setupMapper(input); + setupReducer(output, cubeSegment); + + attachSegmentMetadataWithDict(cubeSegment, job.getConfiguration()); + + return waitForCompletion(job); + + } catch (Exception e) { + logger.error("error in CalculateStatsFromBaseCuboidJob", e); + printUsage(options); + throw e; + } finally { + if (job != null) + cleanupTempConfFile(job.getConfiguration()); + } + } + + private void setupMapper(Path input) throws IOException { + FileInputFormat.setInputPaths(job, input); + job.setMapperClass(CalculateStatsFromBaseCuboidMapper.class); + job.setInputFormatClass(SequenceFileInputFormat.class); + job.setMapOutputKeyClass(Text.class); + job.setMapOutputValueClass(Text.class); + } + + private void setupReducer(Path output, CubeSegment cubeSeg) throws IOException { - int hllShardBase = MapReduceUtil.getHLLShardBase(cubeSeg); - job.getConfiguration().setInt(BatchConstants.CFG_HLL_SHARD_BASE, hllShardBase); ++ int hllShardBase = MapReduceUtil.getCuboidHLLCounterReducerNum(cubeSeg.getCubeInstance()); ++ job.getConfiguration().setInt(BatchConstants.CFG_HLL_REDUCER_NUM, hllShardBase); + + job.setReducerClass(CalculateStatsFromBaseCuboidReducer.class); + job.setOutputFormatClass(SequenceFileOutputFormat.class); + job.setOutputKeyClass(NullWritable.class); + job.setOutputValueClass(Text.class); + job.setNumReduceTasks(hllShardBase); + + FileOutputFormat.setOutputPath(job, output); + job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString()); + + deletePath(job.getConfiguration(), output); + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java index 0000000,70db21b..8b84844 mode 000000,100644..100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CalculateStatsFromBaseCuboidPartitioner.java @@@ -1,0 -1,59 +1,59 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + package org.apache.kylin.engine.mr.steps; + + import org.apache.hadoop.conf.Configurable; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.io.Text; + import org.apache.hadoop.mapreduce.Partitioner; + import org.apache.kylin.common.util.Bytes; + import org.apache.kylin.engine.mr.common.BatchConstants; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + + /** + */ + public class CalculateStatsFromBaseCuboidPartitioner extends Partitioner<Text, Text> implements Configurable { + private static final Logger logger = LoggerFactory.getLogger(CalculateStatsFromBaseCuboidPartitioner.class); + + private Configuration conf; + private int hllShardBase = 1; + + @Override + public int getPartition(Text key, Text value, int numReduceTasks) { + Long cuboidId = Bytes.toLong(key.getBytes()); + int shard = cuboidId.hashCode() % hllShardBase; + if (shard < 0) { + shard += hllShardBase; + } + return numReduceTasks - shard - 1; + } + + @Override + public void setConf(Configuration conf) { + this.conf = conf; - hllShardBase = conf.getInt(BatchConstants.CFG_HLL_SHARD_BASE, 1); ++ hllShardBase = conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1); + logger.info("shard base for hll is " + hllShardBase); + } + + @Override + public Configuration getConf() { + return conf; + } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java index 98ebbb4,d64d300..a457677 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CreateDictionaryJob.java @@@ -72,7 -77,16 +77,16 @@@ public class CreateDictionaryJob extend @Override public Dictionary<String> getDictionary(TblColRef col) throws IOException { - Path colDir = new Path(factColumnsInputPath, col.getIdentity()); + CubeManager cubeManager = CubeManager.getInstance(config); + CubeInstance cube = cubeManager.getCube(cubeName); - List<TblColRef> uhcColumns = CubeManager.getInstance(config).getAllUHCColumns(cube.getDescriptor()); ++ List<TblColRef> uhcColumns = cube.getDescriptor().getAllUHCColumns(); + + Path colDir; + if (uhcColumns.contains(col)) { + colDir = new Path(dictPath, col.getIdentity()); + } else { + colDir = new Path(factColumnsInputPath, col.getIdentity()); + } FileSystem fs = HadoopUtil.getWorkingFileSystem(); Path dictFile = HadoopUtil.getFilterOnlyPath(fs, colDir, col.getName() + FactDistinctColumnsReducer.DICT_FILE_POSTFIX); http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java index a1bba6e,e06077a..b48b19b --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CubingExecutableUtil.java @@@ -38,7 -38,7 +38,8 @@@ import com.google.common.collect.Lists public class CubingExecutableUtil { public static final String CUBE_NAME = "cubeName"; + public static final String DISPALY_NAME = "displayName"; + public static final String SEGMENT_NAME = "segmentName"; public static final String SEGMENT_ID = "segmentId"; public static final String MERGING_SEGMENT_IDS = "mergingSegmentIds"; public static final String STATISTICS_PATH = "statisticsPath"; http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java index 5fcfe42,141ca99..9bede82 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnPartitioner.java @@@ -18,25 -18,52 +18,62 @@@ package org.apache.kylin.engine.mr.steps; ++import java.io.IOException; ++ + import org.apache.hadoop.conf.Configurable; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Partitioner; ++import org.apache.kylin.common.KylinConfig; + import org.apache.kylin.common.util.Bytes; import org.apache.kylin.common.util.BytesUtil; ++import org.apache.kylin.cube.CubeInstance; ++import org.apache.kylin.cube.CubeManager; ++import org.apache.kylin.engine.mr.common.AbstractHadoopJob; + import org.apache.kylin.engine.mr.common.BatchConstants; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; /** */ - public class FactDistinctColumnPartitioner extends Partitioner<SelfDefineSortableKey, Text> { + public class FactDistinctColumnPartitioner extends Partitioner<SelfDefineSortableKey, Text> implements Configurable { + private static final Logger logger = LoggerFactory.getLogger(FactDistinctColumnPartitioner.class); + + private Configuration conf; - private int hllShardBase = 1; ++ private FactDistinctColumnsReducerMapping reducerMapping; ++ ++ @Override ++ public void setConf(Configuration conf) { ++ this.conf = conf; ++ ++ KylinConfig config; ++ try { ++ config = AbstractHadoopJob.loadKylinPropsAndMetadata(); ++ } catch (IOException e) { ++ throw new RuntimeException(e); ++ } ++ String cubeName = conf.get(BatchConstants.CFG_CUBE_NAME); ++ CubeInstance cube = CubeManager.getInstance(config).getCube(cubeName); ++ ++ reducerMapping = new FactDistinctColumnsReducerMapping(cube, ++ conf.getInt(BatchConstants.CFG_HLL_REDUCER_NUM, 1)); ++ } @Override public int getPartition(SelfDefineSortableKey skey, Text value, int numReduceTasks) { Text key = skey.getText(); -- if (key.getBytes()[0] == FactDistinctColumnsMapper.MARK_FOR_HLL) { - // the last reducer is for merging hll - return numReduceTasks - 1; - } else if (key.getBytes()[0] == FactDistinctColumnsMapper.MARK_FOR_PARTITION_COL) { - // the last but one reducer is for partition col - return numReduceTasks - 2; - // the last $hllShard reducers are for merging hll ++ if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_HLL_COUNTER) { + Long cuboidId = Bytes.toLong(key.getBytes(), 1, Bytes.SIZEOF_LONG); - int shard = cuboidId.hashCode() % hllShardBase; - if (shard < 0) { - shard += hllShardBase; - } - return numReduceTasks - shard - 1; - } else if (key.getBytes()[0] == FactDistinctColumnsMapper.MARK_FOR_PARTITION_COL) { - // the last but one reducer is for partition col - return numReduceTasks - hllShardBase - 1; ++ return reducerMapping.getReducerIdForCuboidRowCount(cuboidId); ++ } else if (key.getBytes()[0] == FactDistinctColumnsReducerMapping.MARK_FOR_PARTITION_COL) { ++ return reducerMapping.getReducerIdForDatePartitionColumn(); } else { return BytesUtil.readUnsigned(key.getBytes(), 0, 1); } } + + @Override - public void setConf(Configuration conf) { - this.conf = conf; - hllShardBase = conf.getInt(BatchConstants.CFG_HLL_SHARD_BASE, 1); - logger.info("shard base for hll is " + hllShardBase); - } - - @Override + public Configuration getConf() { + return conf; + } } http://git-wip-us.apache.org/repos/asf/kylin/blob/4d50b269/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java ---------------------------------------------------------------------- diff --cc engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java index ac8ce26,5200950..cc4f260 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/FactDistinctColumnsJob.java @@@ -19,7 -19,7 +19,6 @@@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; --import java.util.Set; import org.apache.commons.cli.Options; import org.apache.hadoop.fs.Path; @@@ -42,7 -42,8 +41,6 @@@ import org.apache.kylin.engine.mr.IMRIn import org.apache.kylin.engine.mr.MRUtil; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.apache.kylin.engine.mr.common.BatchConstants; -import org.apache.kylin.engine.mr.common.MapReduceUtil; --import org.apache.kylin.metadata.model.TblColRef; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@@ -79,21 -82,21 +77,6 @@@ public class FactDistinctColumnsJob ext // add metadata to distributed cache CubeManager cubeMgr = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeInstance cube = cubeMgr.getCube(cubeName); -- Set<TblColRef> columnsNeedDict = cube.getDescriptor().getAllColumnsNeedDictionaryBuilt(); -- -- int reducerCount = columnsNeedDict.size(); -- int uhcReducerCount = cube.getConfig().getUHCReducerCount(); -- -- int[] uhcIndex = cubeMgr.getUHCIndex(cube.getDescriptor()); -- for (int index : uhcIndex) { -- if (index == 1) { -- reducerCount += uhcReducerCount - 1; -- } -- } -- -- if (reducerCount > 255) { -- throw new IllegalArgumentException("The max reducer number for FactDistinctColumnsJob is 255, but now it is " + reducerCount + ", decrease 'kylin.engine.mr.uhc-reducer-count'"); -- } job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); @@@ -114,7 -118,7 +97,7 @@@ } setupMapper(segment); - setupReducer(output, reducerCount + 2); - setupReducer(output, segment, statistics_enabled, reducerCount); ++ setupReducer(output, segment); attachCubeMetadata(cube, job.getConfiguration()); @@@ -143,22 -147,29 +126,32 @@@ job.setMapOutputValueClass(Text.class); } - private void setupReducer(Path output, int numberOfReducers) throws IOException { - private void setupReducer(Path output, CubeSegment cubeSeg, String statistics_enabled, int reducerCount) ++ private void setupReducer(Path output, CubeSegment cubeSeg) + throws IOException { - int numberOfReducers = reducerCount; - if ("true".equalsIgnoreCase(statistics_enabled)) { - int hllShardBase = MapReduceUtil.getHLLShardBase(cubeSeg); - job.getConfiguration().setInt(BatchConstants.CFG_HLL_SHARD_BASE, hllShardBase); - numberOfReducers += (1 + hllShardBase); ++ FactDistinctColumnsReducerMapping reducerMapping = new FactDistinctColumnsReducerMapping(cubeSeg.getCubeInstance()); ++ int numberOfReducers = reducerMapping.getTotalReducerNum(); ++ if (numberOfReducers > 250) { ++ throw new IllegalArgumentException( ++ "The max reducer number for FactDistinctColumnsJob is 250, but now it is " ++ + numberOfReducers ++ + ", decrease 'kylin.engine.mr.uhc-reducer-count'"); + } ++ job.setReducerClass(FactDistinctColumnsReducer.class); job.setPartitionerClass(FactDistinctColumnPartitioner.class); job.setNumReduceTasks(numberOfReducers); ++ job.getConfiguration().setInt(BatchConstants.CFG_HLL_REDUCER_NUM, reducerMapping.getCuboidRowCounterReducerNum()); -- //make each reducer output to respective dir ++ // make each reducer output to respective dir MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_COLUMN, SequenceFileOutputFormat.class, NullWritable.class, Text.class); MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_DICT, SequenceFileOutputFormat.class, NullWritable.class, BytesWritable.class); MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_STATISTICS, SequenceFileOutputFormat.class, LongWritable.class, BytesWritable.class); MultipleOutputs.addNamedOutput(job, BatchConstants.CFG_OUTPUT_PARTITION, TextOutputFormat.class, NullWritable.class, LongWritable.class); -- FileOutputFormat.setOutputPath(job, output); job.getConfiguration().set(BatchConstants.CFG_OUTPUT_PATH, output.toString()); -- //prevent to create zero-sized default output ++ // prevent to create zero-sized default output LazyOutputFormat.setOutputFormatClass(job, SequenceFileOutputFormat.class); deletePath(job.getConfiguration(), output);