http://git-wip-us.apache.org/repos/asf/lens/blob/ae83caae/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java index 2b63193..10c3bbe 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -18,35 +18,21 @@ */ package org.apache.lens.cube.parse; -import java.text.DateFormat; -import java.text.ParseException; -import java.text.SimpleDateFormat; +import static org.apache.lens.cube.parse.CandidateTablePruneCause.incompletePartitions; +import static org.apache.lens.cube.parse.CandidateTablePruneCause.partitionColumnsMissing; import java.util.*; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static org.apache.lens.cube.metadata.DateUtil.WSPACE; -import static org.apache.lens.cube.metadata.MetastoreUtil.*; -import static org.apache.lens.cube.parse.CandidateTablePruneCause.*; -import static org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode.*; -import static org.apache.lens.cube.parse.CandidateTablePruneCause.SkipStorageCode.*; import org.apache.lens.cube.metadata.*; -import org.apache.lens.cube.parse.CandidateTablePruneCause.*; +import org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode; +import org.apache.lens.cube.parse.CandidateTablePruneCause.SkipUpdatePeriodCode; import org.apache.lens.server.api.error.LensException; -import org.apache.lens.server.api.metastore.*; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.util.ReflectionUtils; import com.google.common.collect.Lists; -import com.google.common.collect.Sets; - import lombok.extern.slf4j.Slf4j; - /** * Resolve storages and partitions of all candidate tables and prunes candidate tables with missing storages or * partitions. @@ -57,35 +43,13 @@ class StorageTableResolver implements ContextRewriter { private final Configuration conf; private final List<String> supportedStorages; private final boolean allStoragesSupported; - CubeMetastoreClient client; private final boolean failOnPartialData; private final List<String> validDimTables; - private final Map<CubeFactTable, Map<UpdatePeriod, Set<String>>> validStorageMap = new HashMap<>(); - private String processTimePartCol = null; private final UpdatePeriod maxInterval; + // TODO union : Remove this. All partitions are stored in the StorageCandidate. private final Map<String, Set<String>> nonExistingPartitions = new HashMap<>(); - private TimeRangeWriter rangeWriter; - private DateFormat partWhereClauseFormat = null; + private CubeMetastoreClient client; private PHASE phase; - private HashMap<CubeFactTable, Map<String, SkipStorageCause>> skipStorageCausesPerFact; - private float completenessThreshold; - private String completenessPartCol; - - enum PHASE { - FACT_TABLES, FACT_PARTITIONS, DIM_TABLE_AND_PARTITIONS; - - static PHASE first() { - return values()[0]; - } - - static PHASE last() { - return values()[values().length - 1]; - } - - PHASE next() { - return values()[(this.ordinal() + 1) % values().length]; - } - } StorageTableResolver(Configuration conf) { this.conf = conf; @@ -94,24 +58,13 @@ class StorageTableResolver implements ContextRewriter { this.failOnPartialData = conf.getBoolean(CubeQueryConfUtil.FAIL_QUERY_ON_PARTIAL_DATA, false); String str = conf.get(CubeQueryConfUtil.VALID_STORAGE_DIM_TABLES); validDimTables = StringUtils.isBlank(str) ? null : Arrays.asList(StringUtils.split(str.toLowerCase(), ",")); - this.processTimePartCol = conf.get(CubeQueryConfUtil.PROCESS_TIME_PART_COL); String maxIntervalStr = conf.get(CubeQueryConfUtil.QUERY_MAX_INTERVAL); if (maxIntervalStr != null) { - this.maxInterval = UpdatePeriod.valueOf(maxIntervalStr); + this.maxInterval = UpdatePeriod.valueOf(maxIntervalStr.toUpperCase()); } else { this.maxInterval = null; } - rangeWriter = - ReflectionUtils.newInstance(conf.getClass(CubeQueryConfUtil.TIME_RANGE_WRITER_CLASS, - CubeQueryConfUtil.DEFAULT_TIME_RANGE_WRITER, TimeRangeWriter.class), this.conf); - String formatStr = conf.get(CubeQueryConfUtil.PART_WHERE_CLAUSE_DATE_FORMAT); - if (formatStr != null) { - partWhereClauseFormat = new SimpleDateFormat(formatStr); - } this.phase = PHASE.first(); - completenessThreshold = conf.getFloat(CubeQueryConfUtil.COMPLETENESS_THRESHOLD, - CubeQueryConfUtil.DEFAULT_COMPLETENESS_THRESHOLD); - completenessPartCol = conf.get(CubeQueryConfUtil.COMPLETENESS_CHECK_PART_COL); } private List<String> getSupportedStorages(Configuration conf) { @@ -122,55 +75,88 @@ class StorageTableResolver implements ContextRewriter { return null; } - public boolean isStorageSupported(String storage) { + private boolean isStorageSupportedOnDriver(String storage) { return allStoragesSupported || supportedStorages.contains(storage); } - Map<String, List<String>> storagePartMap = new HashMap<String, List<String>>(); - @Override public void rewriteContext(CubeQueryContext cubeql) throws LensException { client = cubeql.getMetastoreClient(); switch (phase) { - case FACT_TABLES: - if (!cubeql.getCandidateFacts().isEmpty()) { - // resolve storage table names - resolveFactStorageTableNames(cubeql); + case STORAGE_TABLES: + if (!cubeql.getCandidates().isEmpty()) { + resolveStorageTable(cubeql); } - cubeql.pruneCandidateFactSet(CandidateTablePruneCode.NO_CANDIDATE_STORAGES); break; - case FACT_PARTITIONS: - if (!cubeql.getCandidateFacts().isEmpty()) { - // resolve storage partitions - resolveFactStoragePartitions(cubeql); - } - cubeql.pruneCandidateFactSet(CandidateTablePruneCode.NO_CANDIDATE_STORAGES); - if (client != null && client.isDataCompletenessCheckEnabled()) { - if (!cubeql.getCandidateFacts().isEmpty()) { - // resolve incomplete fact partition - resolveFactCompleteness(cubeql); - } - cubeql.pruneCandidateFactSet(CandidateTablePruneCode.INCOMPLETE_PARTITION); + case STORAGE_PARTITIONS: + if (!cubeql.getCandidates().isEmpty()) { + resolveStoragePartitions(cubeql); } break; case DIM_TABLE_AND_PARTITIONS: resolveDimStorageTablesAndPartitions(cubeql); if (cubeql.getAutoJoinCtx() != null) { // After all candidates are pruned after storage resolver, prune join paths. - cubeql.getAutoJoinCtx().pruneAllPaths(cubeql.getCube(), cubeql.getCandidateFacts(), null); + cubeql.getAutoJoinCtx() + .pruneAllPaths(cubeql.getCube(), CandidateUtil.getStorageCandidates(cubeql.getCandidates()), null); cubeql.getAutoJoinCtx().pruneAllPathsForCandidateDims(cubeql.getCandidateDimTables()); cubeql.getAutoJoinCtx().refreshJoinPathColumns(); } + // TODO union : What is this? We may not need this as it non existing partitions are stored in StorageCandidate + cubeql.setNonexistingParts(nonExistingPartitions); break; } - //Doing this on all three phases. Keep updating cubeql with the current identified missing partitions. - cubeql.setNonexistingParts(nonExistingPartitions); phase = phase.next(); } + /** + * Each candidate in the set is a complex candidate. We will evaluate each one to get + * all the partitions needed to answer the query. + * + * @param cubeql cube query context + */ + private void resolveStoragePartitions(CubeQueryContext cubeql) throws LensException { + Iterator<Candidate> candidateIterator = cubeql.getCandidates().iterator(); + while (candidateIterator.hasNext()) { + Candidate candidate = candidateIterator.next(); + boolean isComplete = true; + boolean isTimeRangeAnswerableByThisCandidate = true; + for (TimeRange range : cubeql.getTimeRanges()) { + if (!candidate.isTimeRangeCoverable(range)) { + isTimeRangeAnswerableByThisCandidate = false; + log.info("Not considering candidate:{} as it can not cover time range {}", candidate, range); + cubeql.addCandidatePruningMsg(candidate, + CandidateTablePruneCause.storageNotAvailableInRange(Lists.newArrayList(range))); + break; + } + isComplete &= candidate.evaluateCompleteness(range, range, failOnPartialData); + } + if (!isTimeRangeAnswerableByThisCandidate) { + candidateIterator.remove(); + } else if (failOnPartialData && !isComplete) { + candidateIterator.remove(); + log.info("Not considering candidate:{} as its data is not is not complete", candidate); + Set<StorageCandidate> scSet = CandidateUtil.getStorageCandidates(candidate); + for (StorageCandidate sc : scSet) { + if (!sc.getNonExistingPartitions().isEmpty()) { + cubeql.addStoragePruningMsg(sc, CandidateTablePruneCause.missingPartitions(sc.getNonExistingPartitions())); + } else if (!sc.getDataCompletenessMap().isEmpty()) { + cubeql.addStoragePruningMsg(sc, incompletePartitions(sc.getDataCompletenessMap())); + } + } + } else if (candidate.getParticipatingPartitions().isEmpty() + && candidate instanceof StorageCandidate + && ((StorageCandidate) candidate).getNonExistingPartitions().isEmpty()) { + candidateIterator.remove(); + cubeql.addCandidatePruningMsg(candidate, + new CandidateTablePruneCause(CandidateTablePruneCode.NO_FACT_UPDATE_PERIODS_FOR_GIVEN_RANGE)); + } + } + } + private void resolveDimStorageTablesAndPartitions(CubeQueryContext cubeql) throws LensException { - Set<Dimension> allDims = new HashSet<Dimension>(cubeql.getDimensions()); + Set<Dimension> allDims = new HashSet<>(cubeql.getDimensions()); for (Aliased<Dimension> dim : cubeql.getOptionalDimensions()) { allDims.add(dim.getObject()); } @@ -184,21 +170,23 @@ class StorageTableResolver implements ContextRewriter { CandidateDim candidate = i.next(); CubeDimensionTable dimtable = candidate.dimtable; if (dimtable.getStorages().isEmpty()) { - cubeql.addDimPruningMsgs(dim, dimtable, new CandidateTablePruneCause( - CandidateTablePruneCode.MISSING_STORAGES)); + cubeql + .addDimPruningMsgs(dim, dimtable, new CandidateTablePruneCause(CandidateTablePruneCode.MISSING_STORAGES)); i.remove(); continue; } - Set<String> storageTables = new HashSet<String>(); + Set<String> storageTables = new HashSet<>(); Map<String, String> whereClauses = new HashMap<String, String>(); boolean foundPart = false; - Map<String, SkipStorageCause> skipStorageCauses = new HashMap<>(); + // TODO union : We have to remove all usages of a deprecated class. + Map<String, CandidateTablePruneCode> skipStorageCauses = new HashMap<>(); for (String storage : dimtable.getStorages()) { - if (isStorageSupported(storage)) { - String tableName = getFactOrDimtableStorageTableName(dimtable.getName(), storage).toLowerCase(); + if (isStorageSupportedOnDriver(storage)) { + String tableName = MetastoreUtil.getFactOrDimtableStorageTableName(dimtable.getName(), + storage).toLowerCase(); if (validDimTables != null && !validDimTables.contains(tableName)) { log.info("Not considering dim storage table:{} as it is not a valid dim storage", tableName); - skipStorageCauses.put(tableName, new SkipStorageCause(SkipStorageCode.INVALID)); + skipStorageCauses.put(tableName, CandidateTablePruneCode.INVALID); continue; } @@ -212,13 +200,12 @@ class StorageTableResolver implements ContextRewriter { } if (!failOnPartialData || foundPart) { storageTables.add(tableName); - String whereClause = - StorageUtil.getWherePartClause(dim.getTimedDimension(), null, - StorageConstants.getPartitionsForLatest()); + String whereClause = StorageUtil + .getWherePartClause(dim.getTimedDimension(), null, StorageConstants.getPartitionsForLatest()); whereClauses.put(tableName, whereClause); } else { log.info("Not considering dim storage table:{} as no dim partitions exist", tableName); - skipStorageCauses.put(tableName, new SkipStorageCause(SkipStorageCode.NO_PARTITIONS)); + skipStorageCauses.put(tableName, CandidateTablePruneCode.NO_PARTITIONS); } } else { storageTables.add(tableName); @@ -226,7 +213,7 @@ class StorageTableResolver implements ContextRewriter { } } else { log.info("Storage:{} is not supported", storage); - skipStorageCauses.put(storage, new SkipStorageCause(SkipStorageCode.UNSUPPORTED)); + skipStorageCauses.put(storage, CandidateTablePruneCode.UNSUPPORTED_STORAGE); } } if (!foundPart) { @@ -234,7 +221,8 @@ class StorageTableResolver implements ContextRewriter { } if (storageTables.isEmpty()) { log.info("Not considering dim table:{} as no candidate storage tables eixst", dimtable); - cubeql.addDimPruningMsgs(dim, dimtable, noCandidateStorages(skipStorageCauses)); + cubeql.addDimPruningMsgs(dim, dimtable, + CandidateTablePruneCause.noCandidateStoragesForDimtable(skipStorageCauses)); i.remove(); continue; } @@ -245,619 +233,151 @@ class StorageTableResolver implements ContextRewriter { } } - // Resolves all the storage table names, which are valid for each updatePeriod - private void resolveFactStorageTableNames(CubeQueryContext cubeql) throws LensException { - Iterator<CandidateFact> i = cubeql.getCandidateFacts().iterator(); - skipStorageCausesPerFact = new HashMap<>(); - while (i.hasNext()) { - CubeFactTable fact = i.next().fact; - if (fact.getUpdatePeriods().isEmpty()) { - cubeql.addFactPruningMsgs(fact, new CandidateTablePruneCause(CandidateTablePruneCode.MISSING_STORAGES)); - i.remove(); + /** + * Following storages are removed: + * 1. The storage is not supported by driver. + * 2. The storage is not in the valid storage list. + * 3. The storage is not in any time range in the query. + * 4. The storage having no valid update period. + * + * This method also creates a list of valid update periods and stores them into {@link StorageCandidate}. + * + * TODO union : Do fourth point before 3. + */ + private void resolveStorageTable(CubeQueryContext cubeql) throws LensException { + Iterator<Candidate> it = cubeql.getCandidates().iterator(); + while (it.hasNext()) { + Candidate c = it.next(); + assert (c instanceof StorageCandidate); + StorageCandidate sc = (StorageCandidate) c; + String storageTable = sc.getStorageName(); + // first check: if the storage is supported on driver + if (!isStorageSupportedOnDriver(storageTable)) { + log.info("Skipping storage: {} as it is not supported", storageTable); + cubeql.addStoragePruningMsg(sc, new CandidateTablePruneCause(CandidateTablePruneCode.UNSUPPORTED_STORAGE)); + it.remove(); continue; } - Map<UpdatePeriod, Set<String>> storageTableMap = new TreeMap<UpdatePeriod, Set<String>>(); - validStorageMap.put(fact, storageTableMap); - String str = conf.get(CubeQueryConfUtil.getValidStorageTablesKey(fact.getName())); + String str = conf.get(CubeQueryConfUtil.getValidStorageTablesKey(sc.getFact().getName())); List<String> validFactStorageTables = StringUtils.isBlank(str) ? null : Arrays.asList(StringUtils.split(str.toLowerCase(), ",")); - Map<String, SkipStorageCause> skipStorageCauses = new HashMap<>(); - - for (Map.Entry<String, Set<UpdatePeriod>> entry : fact.getUpdatePeriods().entrySet()) { - String storage = entry.getKey(); - // skip storages that are not supported - if (!isStorageSupported(storage)) { - log.info("Skipping storage: {} as it is not supported", storage); - skipStorageCauses.put(storage, new SkipStorageCause(SkipStorageCode.UNSUPPORTED)); - continue; - } - String table = getStorageTableName(fact, storage, validFactStorageTables); - // skip the update period if the storage is not valid - if (table == null) { - skipStorageCauses.put(storage, new SkipStorageCause(SkipStorageCode.INVALID)); - continue; - } - List<String> validUpdatePeriods = - CubeQueryConfUtil.getStringList(conf, CubeQueryConfUtil.getValidUpdatePeriodsKey(fact.getName(), storage)); - - boolean isStorageAdded = false; - Map<String, SkipUpdatePeriodCode> skipUpdatePeriodCauses = new HashMap<String, SkipUpdatePeriodCode>(); - for (UpdatePeriod updatePeriod : entry.getValue()) { - if (maxInterval != null && updatePeriod.compareTo(maxInterval) > 0) { - log.info("Skipping update period {} for fact {}", updatePeriod, fact); - skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.QUERY_INTERVAL_BIGGER); - continue; - } - if (validUpdatePeriods != null && !validUpdatePeriods.contains(updatePeriod.name().toLowerCase())) { - log.info("Skipping update period {} for fact {} for storage {}", updatePeriod, fact, storage); - skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.INVALID); - continue; - } - Set<String> storageTables = storageTableMap.get(updatePeriod); - if (storageTables == null) { - storageTables = new LinkedHashSet<>(); - storageTableMap.put(updatePeriod, storageTables); - } - isStorageAdded = true; - log.debug("Adding storage table:{} for fact:{} for update period {}", table, fact, updatePeriod); - storageTables.add(table); - } - if (!isStorageAdded) { - skipStorageCauses.put(storage, SkipStorageCause.noCandidateUpdatePeriod(skipUpdatePeriodCauses)); - } - } - skipStorageCausesPerFact.put(fact, skipStorageCauses); - if (storageTableMap.isEmpty()) { - log.info("Not considering fact table:{} as it does not have any storage tables", fact); - cubeql.addFactPruningMsgs(fact, noCandidateStorages(skipStorageCauses)); - i.remove(); - } - } - } - - private TreeSet<UpdatePeriod> getValidUpdatePeriods(CubeFactTable fact) { - TreeSet<UpdatePeriod> set = new TreeSet<UpdatePeriod>(); - set.addAll(validStorageMap.get(fact).keySet()); - return set; - } - - String getStorageTableName(CubeFactTable fact, String storage, List<String> validFactStorageTables) { - String tableName = getFactOrDimtableStorageTableName(fact.getName(), storage).toLowerCase(); - if (validFactStorageTables != null && !validFactStorageTables.contains(tableName)) { - log.info("Skipping storage table {} as it is not valid", tableName); - return null; - } - return tableName; - } - - private TimeRange getFallbackRange(TimeRange range, CandidateFact cfact, CubeQueryContext cubeql) - throws LensException { - Cube baseCube = cubeql.getBaseCube(); - ArrayList<String> tableNames = Lists.newArrayList(cfact.fact.getName(), cubeql.getCube().getName()); - if (!cubeql.getCube().getName().equals(baseCube.getName())) { - tableNames.add(baseCube.getName()); - } - String fallBackString = null; - String timedim = baseCube.getTimeDimOfPartitionColumn(range.getPartitionColumn()); - for (String tableName : tableNames) { - fallBackString = cubeql.getMetastoreClient().getTable(tableName).getParameters() - .get(MetastoreConstants.TIMEDIM_RELATION + timedim); - if (StringUtils.isNotBlank(fallBackString)) { - break; - } - } - if (StringUtils.isBlank(fallBackString)) { - return null; - } - Matcher matcher = Pattern.compile("(.*?)\\+\\[(.*?),(.*?)\\]").matcher(fallBackString.replaceAll(WSPACE, "")); - if (!matcher.matches()) { - return null; - } - DateUtil.TimeDiff diff1 = DateUtil.TimeDiff.parseFrom(matcher.group(2).trim()); - DateUtil.TimeDiff diff2 = DateUtil.TimeDiff.parseFrom(matcher.group(3).trim()); - String relatedTimeDim = matcher.group(1).trim(); - String fallbackPartCol = baseCube.getPartitionColumnOfTimeDim(relatedTimeDim); - return TimeRange.getBuilder() - .fromDate(diff2.negativeOffsetFrom(range.getFromDate())) - .toDate(diff1.negativeOffsetFrom(range.getToDate())) - .partitionColumn(fallbackPartCol).build(); - } - - private void resolveFactStoragePartitions(CubeQueryContext cubeql) throws LensException { - // Find candidate tables wrt supported storages - Iterator<CandidateFact> i = cubeql.getCandidateFacts().iterator(); - while (i.hasNext()) { - CandidateFact cfact = i.next(); - Map<TimeRange, String> whereClauseForFallback = new LinkedHashMap<TimeRange, String>(); - List<FactPartition> answeringParts = new ArrayList<>(); - Map<String, SkipStorageCause> skipStorageCauses = skipStorageCausesPerFact.get(cfact.fact); - if (skipStorageCauses == null) { - skipStorageCauses = new HashMap<>(); - } - PartitionRangesForPartitionColumns missingParts = new PartitionRangesForPartitionColumns(); - boolean noPartsForRange = false; - Set<String> unsupportedTimeDims = Sets.newHashSet(); - Set<String> partColsQueried = Sets.newHashSet(); - for (TimeRange range : cubeql.getTimeRanges()) { - partColsQueried.add(range.getPartitionColumn()); - StringBuilder extraWhereClause = new StringBuilder(); - Set<FactPartition> rangeParts = getPartitions(cfact.fact, range, skipStorageCauses, missingParts); - // If no partitions were found, then we'll fallback. - String partCol = range.getPartitionColumn(); - boolean partColNotSupported = rangeParts.isEmpty(); - for (String storage : cfact.fact.getStorages()) { - String storageTableName = getFactOrDimtableStorageTableName(cfact.fact.getName(), storage).toLowerCase(); - partColNotSupported &= skipStorageCauses.containsKey(storageTableName) - && skipStorageCauses.get(storageTableName).getCause().equals(PART_COL_DOES_NOT_EXIST) - && skipStorageCauses.get(storageTableName).getNonExistantPartCols().contains(partCol); - } - TimeRange prevRange = range; - String sep = ""; - while (rangeParts.isEmpty()) { - // TODO: should we add a condition whether on range's partcol any missing partitions are not there - String timeDim = cubeql.getBaseCube().getTimeDimOfPartitionColumn(partCol); - if (partColNotSupported && !cfact.getColumns().contains(timeDim)) { - unsupportedTimeDims.add(cubeql.getBaseCube().getTimeDimOfPartitionColumn(range.getPartitionColumn())); - break; - } - TimeRange fallBackRange = getFallbackRange(prevRange, cfact, cubeql); - log.info("No partitions for range:{}. fallback range: {}", range, fallBackRange); - if (fallBackRange == null) { - break; - } - partColsQueried.add(fallBackRange.getPartitionColumn()); - rangeParts = getPartitions(cfact.fact, fallBackRange, skipStorageCauses, missingParts); - extraWhereClause.append(sep) - .append(prevRange.toTimeDimWhereClause(cubeql.getAliasForTableName(cubeql.getCube()), timeDim)); - sep = " AND "; - prevRange = fallBackRange; - partCol = prevRange.getPartitionColumn(); - if (!rangeParts.isEmpty()) { - break; - } - } - whereClauseForFallback.put(range, extraWhereClause.toString()); - if (rangeParts.isEmpty()) { - log.info("No partitions for fallback range:{}", range); - noPartsForRange = true; - continue; - } - // If multiple storage tables are part of the same fact, - // capture range->storage->partitions - Map<String, LinkedHashSet<FactPartition>> tablePartMap = new HashMap<String, LinkedHashSet<FactPartition>>(); - for (FactPartition factPart : rangeParts) { - for (String table : factPart.getStorageTables()) { - if (!tablePartMap.containsKey(table)) { - tablePartMap.put(table, new LinkedHashSet<>(Collections.singletonList(factPart))); - } else { - LinkedHashSet<FactPartition> storagePart = tablePartMap.get(table); - storagePart.add(factPart); - } - } - } - cfact.getRangeToStoragePartMap().put(range, tablePartMap); - cfact.incrementPartsQueried(rangeParts.size()); - answeringParts.addAll(rangeParts); - cfact.getPartsQueried().addAll(rangeParts); - } - if (!unsupportedTimeDims.isEmpty()) { - log.info("Not considering fact table:{} as it doesn't support time dimensions: {}", cfact.fact, - unsupportedTimeDims); - cubeql.addFactPruningMsgs(cfact.fact, timeDimNotSupported(unsupportedTimeDims)); - i.remove(); - continue; - } - Set<String> nonExistingParts = missingParts.toSet(partColsQueried); - if (!nonExistingParts.isEmpty()) { - addNonExistingParts(cfact.fact.getName(), nonExistingParts); - } - if (cfact.getNumQueriedParts() == 0 || (failOnPartialData && (noPartsForRange || !nonExistingParts.isEmpty()))) { - log.info("Not considering fact table:{} as it could not find partition for given ranges: {}", cfact.fact, - cubeql.getTimeRanges()); - /* - * This fact is getting discarded because of any of following reasons: - * 1. Has missing partitions - * 2. All Storage tables were skipped for some reasons. - * 3. Storage tables do not have the update period for the timerange queried. - */ - if (failOnPartialData && !nonExistingParts.isEmpty()) { - cubeql.addFactPruningMsgs(cfact.fact, missingPartitions(nonExistingParts)); - } else if (!skipStorageCauses.isEmpty()) { - CandidateTablePruneCause cause = noCandidateStorages(skipStorageCauses); - cubeql.addFactPruningMsgs(cfact.fact, cause); - } else { - CandidateTablePruneCause cause = - new CandidateTablePruneCause(NO_FACT_UPDATE_PERIODS_FOR_GIVEN_RANGE); - cubeql.addFactPruningMsgs(cfact.fact, cause); - } - i.remove(); + storageTable = sc.getStorageTable(); + // Check if storagetable is in the list of valid storages. + if (validFactStorageTables != null && !validFactStorageTables.contains(storageTable)) { + log.info("Skipping storage table {} as it is not valid", storageTable); + cubeql.addStoragePruningMsg(sc, new CandidateTablePruneCause(CandidateTablePruneCode.INVALID_STORAGE)); + it.remove(); continue; } - // Map from storage to covering parts - Map<String, Set<FactPartition>> minimalStorageTables = new LinkedHashMap<String, Set<FactPartition>>(); - StorageUtil.getMinimalAnsweringTables(answeringParts, minimalStorageTables); - if (minimalStorageTables.isEmpty()) { - log.info("Not considering fact table:{} as it does not have any storage tables", cfact); - cubeql.addFactPruningMsgs(cfact.fact, noCandidateStorages(skipStorageCauses)); - i.remove(); + List<String> validUpdatePeriods = CubeQueryConfUtil + .getStringList(conf, CubeQueryConfUtil.getValidUpdatePeriodsKey(sc.getFact().getName(), sc.getStorageName())); + boolean isUpdatePeriodForStorageAdded = false; + Map<String, SkipUpdatePeriodCode> skipUpdatePeriodCauses = new HashMap<>(); + + if (cubeql.getTimeRanges().stream().noneMatch(range -> CandidateUtil.isPartiallyValidForTimeRange(sc, range))) { + cubeql.addStoragePruningMsg(sc, + new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE)); + it.remove(); continue; } - Set<String> storageTables = new LinkedHashSet<>(); - storageTables.addAll(minimalStorageTables.keySet()); - cfact.setStorageTables(storageTables); - // Update range->storage->partitions with time range where clause - for (TimeRange trange : cfact.getRangeToStoragePartMap().keySet()) { - Map<String, String> rangeToWhere = new HashMap<>(); - for (Map.Entry<String, Set<FactPartition>> entry : minimalStorageTables.entrySet()) { - String table = entry.getKey(); - Set<FactPartition> minimalParts = entry.getValue(); - - LinkedHashSet<FactPartition> rangeParts = cfact.getRangeToStoragePartMap().get(trange).get(table); - LinkedHashSet<FactPartition> minimalPartsCopy = Sets.newLinkedHashSet(); - - if (rangeParts != null) { - minimalPartsCopy.addAll(minimalParts); - minimalPartsCopy.retainAll(rangeParts); - } - if (!StringUtils.isEmpty(whereClauseForFallback.get(trange))) { - rangeToWhere.put(table, "((" - + rangeWriter.getTimeRangeWhereClause(cubeql, cubeql.getAliasForTableName(cubeql.getCube().getName()), - minimalPartsCopy) + ") and (" + whereClauseForFallback.get(trange) + "))"); - } else { - rangeToWhere.put(table, rangeWriter.getTimeRangeWhereClause(cubeql, - cubeql.getAliasForTableName(cubeql.getCube().getName()), minimalPartsCopy)); - } - } - cfact.getRangeToStorageWhereMap().put(trange, rangeToWhere); - } - log.info("Resolved partitions for fact {}: {} storageTables:{}", cfact, answeringParts, storageTables); - } - } - private static boolean processCubeColForDataCompleteness(CubeQueryContext cubeql, String cubeCol, String alias, - Set<String> measureTag, - Map<String, String> tagToMeasureOrExprMap) { - CubeMeasure column = cubeql.getCube().getMeasureByName(cubeCol); - if (column != null && column.getTags() != null) { - String dataCompletenessTag = column.getTags().get(MetastoreConstants.MEASURE_DATACOMPLETENESS_TAG); - //Checking if dataCompletenessTag is set for queried measure - if (dataCompletenessTag != null) { - measureTag.add(dataCompletenessTag); - String value = tagToMeasureOrExprMap.get(dataCompletenessTag); - if (value == null) { - tagToMeasureOrExprMap.put(dataCompletenessTag, alias); + // Populate valid update periods abd check validity at update period level + for (UpdatePeriod updatePeriod : sc.getFact().getUpdatePeriods().get(sc.getStorageName())) { + if (maxInterval != null && updatePeriod.compareTo(maxInterval) > 0) { + // if user supplied max interval, all intervals larger than that are useless. + log.info("Skipping update period {} for candidate {} since it's more than max interval supplied({})", + updatePeriod, sc.getStorageTable(), maxInterval); + skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.UPDATE_PERIOD_BIGGER_THAN_MAX); + } else if (validUpdatePeriods != null && !validUpdatePeriods.contains(updatePeriod.name().toLowerCase())) { + // if user supplied valid update periods, other update periods are useless + log.info("Skipping update period {} for candidate {} for storage {} since it's invalid", + updatePeriod, sc.getStorageTable(), storageTable); + skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.INVALID); + } else if (!sc.isUpdatePeriodUseful(updatePeriod)) { + // if the storage candidate finds this update useful to keep looking at the time ranges queried + skipUpdatePeriodCauses.put(updatePeriod.toString(), + SkipUpdatePeriodCode.TIME_RANGE_NOT_ANSWERABLE_BY_UPDATE_PERIOD); } else { - value = value.concat(",").concat(alias); - tagToMeasureOrExprMap.put(dataCompletenessTag, value); - } - return true; - } - } - return false; - } - - private static void processMeasuresFromExprMeasures(CubeQueryContext cubeql, Set<String> measureTag, - Map<String, String> tagToMeasureOrExprMap) { - boolean isExprProcessed; - String cubeAlias = cubeql.getAliasForTableName(cubeql.getCube().getName()); - for (String expr : cubeql.getQueriedExprsWithMeasures()) { - isExprProcessed = false; - for (ExpressionResolver.ExprSpecContext esc : cubeql.getExprCtx().getExpressionContext(expr, cubeAlias) - .getAllExprs()) { - if (esc.getTblAliasToColumns().get(cubeAlias) != null) { - for (String cubeCol : esc.getTblAliasToColumns().get(cubeAlias)) { - if (processCubeColForDataCompleteness(cubeql, cubeCol, expr, measureTag, tagToMeasureOrExprMap)) { - /* This is done to associate the expression with one of the dataCompletenessTag for the measures. - So, even if the expression is composed of measures with different dataCompletenessTags, we will be - determining the dataCompleteness from one of the measure and this expression is grouped with the - other queried measures that have the same dataCompletenessTag. */ - isExprProcessed = true; - break; - } - } - } - if (isExprProcessed) { - break; + isUpdatePeriodForStorageAdded = true; + sc.addValidUpdatePeriod(updatePeriod); } } - } - } - - private void resolveFactCompleteness(CubeQueryContext cubeql) throws LensException { - if (client == null || client.getCompletenessChecker() == null || completenessPartCol == null) { - return; - } - DataCompletenessChecker completenessChecker = client.getCompletenessChecker(); - Set<String> measureTag = new HashSet<>(); - Map<String, String> tagToMeasureOrExprMap = new HashMap<>(); - - processMeasuresFromExprMeasures(cubeql, measureTag, tagToMeasureOrExprMap); - - Set<String> measures = cubeql.getQueriedMsrs(); - if (measures == null) { - measures = new HashSet<>(); - } - for (String measure : measures) { - processCubeColForDataCompleteness(cubeql, measure, measure, measureTag, tagToMeasureOrExprMap); - } - //Checking if dataCompletenessTag is set for the fact - if (measureTag.isEmpty()) { - log.info("No Queried measures with the dataCompletenessTag, hence skipping the availability check"); - return; - } - Iterator<CandidateFact> i = cubeql.getCandidateFacts().iterator(); - DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - formatter.setTimeZone(TimeZone.getTimeZone("UTC")); - while (i.hasNext()) { - CandidateFact cFact = i.next(); - // Map from measure to the map from partition to %completeness - Map<String, Map<String, Float>> incompleteMeasureData = new HashMap<>(); - - String factDataCompletenessTag = cFact.fact.getDataCompletenessTag(); - if (factDataCompletenessTag == null) { - log.info("Not checking completeness for the fact table:{} as the dataCompletenessTag is not set", cFact.fact); - continue; + // For DEBUG purpose only to see why some update periods are skipped. + if (!skipUpdatePeriodCauses.isEmpty()) { + sc.setUpdatePeriodRejectionCause(skipUpdatePeriodCauses); } - boolean isFactDataIncomplete = false; - for (TimeRange range : cubeql.getTimeRanges()) { - if (!range.getPartitionColumn().equals(completenessPartCol)) { - log.info("Completeness check not available for partCol:{}", range.getPartitionColumn()); - continue; + // if no update periods were added in previous section, we skip this storage candidate + if (!isUpdatePeriodForStorageAdded) { + if (skipUpdatePeriodCauses.values().stream().allMatch( + SkipUpdatePeriodCode.TIME_RANGE_NOT_ANSWERABLE_BY_UPDATE_PERIOD::equals)) { + // all update periods bigger than query range, it means time range not answerable. + cubeql.addStoragePruningMsg(sc, + new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE)); + } else { // Update periods are rejected for multiple reasons. + cubeql.addStoragePruningMsg(sc, CandidateTablePruneCause.updatePeriodsRejected(skipUpdatePeriodCauses)); } - Date from = range.getFromDate(); - Date to = range.getToDate(); - Map<String, Map<Date, Float>> completenessMap = completenessChecker.getCompleteness(factDataCompletenessTag, - from, to, measureTag); - if (completenessMap != null && !completenessMap.isEmpty()) { - for (Map.Entry<String, Map<Date, Float>> measureCompleteness : completenessMap.entrySet()) { - String tag = measureCompleteness.getKey(); - for (Map.Entry<Date, Float> completenessResult : measureCompleteness.getValue().entrySet()) { - if (completenessResult.getValue() < completenessThreshold) { - log.info("Completeness for the measure_tag {} is {}, threshold: {}, for the hour {}", tag, - completenessResult.getValue(), completenessThreshold, - formatter.format(completenessResult.getKey())); - String measureorExprFromTag = tagToMeasureOrExprMap.get(tag); - Map<String, Float> incompletePartition = incompleteMeasureData.get(measureorExprFromTag); - if (incompletePartition == null) { - incompletePartition = new HashMap<>(); - incompleteMeasureData.put(measureorExprFromTag, incompletePartition); + it.remove(); + } else { + //set the dates again as they can change based on ValidUpdatePeriod + sc.setStorageStartAndEndDate(); + Set<CandidateTablePruneCause> allPruningCauses = new HashSet<>(cubeql.getTimeRanges().size()); + for (TimeRange range : cubeql.getTimeRanges()) { + CandidateTablePruneCause pruningCauseForThisTimeRange = null; + if (!CandidateUtil.isPartiallyValidForTimeRange(sc, range)) { + //This is the prune cause + pruningCauseForThisTimeRange = + new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE); + } else if (cubeql.shouldReplaceTimeDimWithPart()) { + if (!client.partColExists(sc.getFact().getName(), sc.getStorageName(), range.getPartitionColumn())) { + pruningCauseForThisTimeRange = partitionColumnsMissing(range.getPartitionColumn()); + TimeRange fallBackRange = StorageUtil.getFallbackRange(range, sc.getFact().getName(), cubeql); + while (fallBackRange != null) { + pruningCauseForThisTimeRange = null; + if (!client.partColExists(sc.getFact().getName(), sc.getStorageName(), + fallBackRange.getPartitionColumn())) { + pruningCauseForThisTimeRange = partitionColumnsMissing(fallBackRange.getPartitionColumn()); + fallBackRange = StorageUtil.getFallbackRange(fallBackRange, sc.getFact().getName(), cubeql); + } else { + if (!CandidateUtil.isPartiallyValidForTimeRange(sc, fallBackRange)) { + pruningCauseForThisTimeRange = + new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE); + } + break; } - incompletePartition.put(formatter.format(completenessResult.getKey()), completenessResult.getValue()); - isFactDataIncomplete = true; } } } + + if (pruningCauseForThisTimeRange != null) { + allPruningCauses.add(pruningCauseForThisTimeRange); + } } - } - if (isFactDataIncomplete) { - log.info("Fact table:{} has partitions with incomplete data: {} for given ranges: {}", cFact.fact, - incompleteMeasureData, cubeql.getTimeRanges()); - if (failOnPartialData) { - i.remove(); - cubeql.addFactPruningMsgs(cFact.fact, incompletePartitions(incompleteMeasureData)); - } else { - cFact.setDataCompletenessMap(incompleteMeasureData); + if (!allPruningCauses.isEmpty()) { + // TODO if this storage can answer atleast one time range , why prune it ? + it.remove(); + cubeql.addStoragePruningMsg(sc, allPruningCauses.toArray(new CandidateTablePruneCause[0])); } } } } - void addNonExistingParts(String name, Set<String> nonExistingParts) { + private void addNonExistingParts(String name, Set<String> nonExistingParts) { nonExistingPartitions.put(name, nonExistingParts); } - private Set<FactPartition> getPartitions(CubeFactTable fact, TimeRange range, - Map<String, SkipStorageCause> skipStorageCauses, - PartitionRangesForPartitionColumns missingPartitions) throws LensException { - try { - return getPartitions(fact, range, getValidUpdatePeriods(fact), true, failOnPartialData, skipStorageCauses, - missingPartitions); - } catch (Exception e) { - throw new LensException(e); - } - } - - private Set<FactPartition> getPartitions(CubeFactTable fact, TimeRange range, TreeSet<UpdatePeriod> updatePeriods, - boolean addNonExistingParts, boolean failOnPartialData, Map<String, SkipStorageCause> skipStorageCauses, - PartitionRangesForPartitionColumns missingPartitions) - throws Exception { - Set<FactPartition> partitions = new TreeSet<>(); - if (range != null && range.isCoverableBy(updatePeriods) - && getPartitions(fact, range.getFromDate(), range.getToDate(), range.getPartitionColumn(), partitions, - updatePeriods, addNonExistingParts, failOnPartialData, skipStorageCauses, missingPartitions)) { - return partitions; - } else { - return new TreeSet<>(); - } - } - - private boolean getPartitions(CubeFactTable fact, Date fromDate, Date toDate, String partCol, - Set<FactPartition> partitions, TreeSet<UpdatePeriod> updatePeriods, - boolean addNonExistingParts, boolean failOnPartialData, Map<String, SkipStorageCause> skipStorageCauses, - PartitionRangesForPartitionColumns missingPartitions) - throws Exception { - log.info("getPartitions for {} from fromDate:{} toDate:{}", fact, fromDate, toDate); - if (fromDate.equals(toDate) || fromDate.after(toDate)) { - return true; - } - UpdatePeriod interval = CubeFactTable.maxIntervalInRange(fromDate, toDate, updatePeriods); - if (interval == null) { - log.info("No max interval for range: {} to {}", fromDate, toDate); - return false; - } - log.debug("Max interval for {} is: {}", fact, interval); - Set<String> storageTbls = new LinkedHashSet<String>(); - storageTbls.addAll(validStorageMap.get(fact).get(interval)); - - if (interval == UpdatePeriod.CONTINUOUS && rangeWriter.getClass().equals(BetweenTimeRangeWriter.class)) { - for (String storageTbl : storageTbls) { - FactPartition part = new FactPartition(partCol, fromDate, interval, null, partWhereClauseFormat); - partitions.add(part); - part.getStorageTables().add(storageTbl); - part = new FactPartition(partCol, toDate, interval, null, partWhereClauseFormat); - partitions.add(part); - part.getStorageTables().add(storageTbl); - log.info("Added continuous fact partition for storage table {}", storageTbl); - } - return true; - } - - Iterator<String> it = storageTbls.iterator(); - while (it.hasNext()) { - String storageTableName = it.next(); - if (!client.isStorageTableCandidateForRange(storageTableName, fromDate, toDate)) { - skipStorageCauses.put(storageTableName, new SkipStorageCause(RANGE_NOT_ANSWERABLE)); - it.remove(); - } else if (!client.partColExists(storageTableName, partCol)) { - log.info("{} does not exist in {}", partCol, storageTableName); - skipStorageCauses.put(storageTableName, SkipStorageCause.partColDoesNotExist(partCol)); - it.remove(); - } - } - - if (storageTbls.isEmpty()) { - return false; - } - Date ceilFromDate = DateUtil.getCeilDate(fromDate, interval); - Date floorToDate = DateUtil.getFloorDate(toDate, interval); - - int lookAheadNumParts = - conf.getInt(CubeQueryConfUtil.getLookAheadPTPartsKey(interval), CubeQueryConfUtil.DEFAULT_LOOK_AHEAD_PT_PARTS); - - TimeRange.Iterable.Iterator iter = TimeRange.iterable(ceilFromDate, floorToDate, interval, 1).iterator(); - // add partitions from ceilFrom to floorTo - while (iter.hasNext()) { - Date dt = iter.next(); - Date nextDt = iter.peekNext(); - FactPartition part = new FactPartition(partCol, dt, interval, null, partWhereClauseFormat); - log.debug("candidate storage tables for searching partitions: {}", storageTbls); - updateFactPartitionStorageTablesFrom(fact, part, storageTbls); - log.debug("Storage tables containing Partition {} are: {}", part, part.getStorageTables()); - if (part.isFound()) { - log.debug("Adding existing partition {}", part); - partitions.add(part); - log.debug("Looking for look ahead process time partitions for {}", part); - if (processTimePartCol == null) { - log.debug("processTimePartCol is null"); - } else if (partCol.equals(processTimePartCol)) { - log.debug("part column is process time col"); - } else if (updatePeriods.first().equals(interval)) { - log.debug("Update period is the least update period"); - } else if ((iter.getNumIters() - iter.getCounter()) > lookAheadNumParts) { - // see if this is the part of the last-n look ahead partitions - log.debug("Not a look ahead partition"); - } else { - log.debug("Looking for look ahead process time partitions for {}", part); - // check if finer partitions are required - // final partitions are required if no partitions from - // look-ahead - // process time are present - TimeRange.Iterable.Iterator processTimeIter = TimeRange.iterable(nextDt, lookAheadNumParts, - interval, 1).iterator(); - while (processTimeIter.hasNext()) { - Date pdt = processTimeIter.next(); - Date nextPdt = processTimeIter.peekNext(); - FactPartition processTimePartition = new FactPartition(processTimePartCol, pdt, interval, null, - partWhereClauseFormat); - updateFactPartitionStorageTablesFrom(fact, processTimePartition, - part.getStorageTables()); - if (processTimePartition.isFound()) { - log.debug("Finer parts not required for look-ahead partition :{}", part); - } else { - log.debug("Looked ahead process time partition {} is not found", processTimePartition); - TreeSet<UpdatePeriod> newset = new TreeSet<UpdatePeriod>(); - newset.addAll(updatePeriods); - newset.remove(interval); - log.debug("newset of update periods:{}", newset); - if (!newset.isEmpty()) { - // Get partitions for look ahead process time - log.debug("Looking for process time partitions between {} and {}", pdt, nextPdt); - Set<FactPartition> processTimeParts = - getPartitions(fact, TimeRange.getBuilder().fromDate(pdt).toDate(nextPdt).partitionColumn( - processTimePartCol).build(), newset, true, false, skipStorageCauses, missingPartitions); - log.debug("Look ahead partitions: {}", processTimeParts); - TimeRange timeRange = TimeRange.getBuilder().fromDate(dt).toDate(nextDt).build(); - for (FactPartition pPart : processTimeParts) { - log.debug("Looking for finer partitions in pPart: {}", pPart); - for (Date date : timeRange.iterable(pPart.getPeriod(), 1)) { - FactPartition innerPart = new FactPartition(partCol, date, pPart.getPeriod(), pPart, - partWhereClauseFormat); - updateFactPartitionStorageTablesFrom(fact, innerPart, pPart); - if (innerPart.isFound()) { - partitions.add(innerPart); - } - } - log.debug("added all sub partitions blindly in pPart: {}", pPart); - } - } - } - } - } - } else { - log.info("Partition:{} does not exist in any storage table", part); - TreeSet<UpdatePeriod> newset = new TreeSet<UpdatePeriod>(); - newset.addAll(updatePeriods); - newset.remove(interval); - if (!getPartitions(fact, dt, nextDt, partCol, partitions, newset, false, failOnPartialData, skipStorageCauses, - missingPartitions)) { + enum PHASE { + STORAGE_TABLES, STORAGE_PARTITIONS, DIM_TABLE_AND_PARTITIONS; - log.debug("Adding non existing partition {}", part); - if (addNonExistingParts) { - // Add non existing partitions for all cases of whether we populate all non existing or not. - missingPartitions.add(part); - if (!failOnPartialData) { - Set<String> st = getStorageTablesWithoutPartCheck(part, storageTbls); - if (st.isEmpty()) { - log.info("No eligible storage tables"); - return false; - } - partitions.add(part); - part.getStorageTables().addAll(st); - } - } else { - log.info("No finer granual partitions exist for {}", part); - return false; - } - } else { - log.debug("Finer granual partitions added for {}", part); - } - } + static PHASE first() { + return values()[0]; } - return getPartitions(fact, fromDate, ceilFromDate, partCol, partitions, - updatePeriods, addNonExistingParts, failOnPartialData, skipStorageCauses, missingPartitions) - && getPartitions(fact, floorToDate, toDate, partCol, partitions, - updatePeriods, addNonExistingParts, failOnPartialData, skipStorageCauses, missingPartitions); - } - private Set<String> getStorageTablesWithoutPartCheck(FactPartition part, - Set<String> storageTableNames) throws LensException, HiveException { - Set<String> validStorageTbls = new HashSet<>(); - for (String storageTableName : storageTableNames) { - // skip all storage tables for which are not eligible for this partition - if (client.isStorageTablePartitionACandidate(storageTableName, part.getPartSpec())) { - validStorageTbls.add(storageTableName); - } else { - log.info("Skipping {} as it is not valid for part {}", storageTableName, part.getPartSpec()); - } + static PHASE last() { + return values()[values().length - 1]; } - return validStorageTbls; - } - private void updateFactPartitionStorageTablesFrom(CubeFactTable fact, - FactPartition part, Set<String> storageTableNames) throws LensException, HiveException, ParseException { - for (String storageTableName : storageTableNames) { - // skip all storage tables for which are not eligible for this partition - if (client.isStorageTablePartitionACandidate(storageTableName, part.getPartSpec()) - && (client.factPartitionExists(fact, part, storageTableName))) { - part.getStorageTables().add(storageTableName); - part.setFound(true); - } + PHASE next() { + return values()[(this.ordinal() + 1) % values().length]; } } - - private void updateFactPartitionStorageTablesFrom(CubeFactTable fact, - FactPartition part, FactPartition pPart) throws LensException, HiveException, ParseException { - updateFactPartitionStorageTablesFrom(fact, part, pPart.getStorageTables()); - part.setFound(part.isFound() && pPart.isFound()); - } }
http://git-wip-us.apache.org/repos/asf/lens/blob/ae83caae/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java index f9636d1..f5cd540 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java @@ -7,7 +7,7 @@ * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an @@ -18,13 +18,19 @@ */ package org.apache.lens.cube.parse; +import static org.apache.lens.cube.metadata.DateUtil.WSPACE; + import java.util.*; +import java.util.regex.Matcher; +import java.util.regex.Pattern; -import org.apache.lens.cube.metadata.FactPartition; -import org.apache.lens.cube.metadata.StorageConstants; +import org.apache.lens.cube.metadata.*; +import org.apache.lens.server.api.error.LensException; import org.apache.commons.lang.StringUtils; +import com.google.common.collect.Lists; + public final class StorageUtil { private StorageUtil() { @@ -69,8 +75,8 @@ public final class StorageUtil { String sep = ""; for (String timePartCol : timedDimensions) { if (!timePartCol.equals(partCol)) { - sb.append(sep).append(alias).append(".").append(timePartCol) - .append(" != '").append(StorageConstants.LATEST_PARTITION_VALUE).append("'"); + sb.append(sep).append(alias).append(".").append(timePartCol).append(" != '") + .append(StorageConstants.LATEST_PARTITION_VALUE).append("'"); sep = " AND "; } } @@ -82,15 +88,11 @@ public final class StorageUtil { String sep = "(("; for (String clause : clauses) { if (clause != null && !clause.isEmpty()) { - sb - .append(sep) - .append(clause); + sb.append(sep).append(clause); sep = ") AND ("; } } - return sb - .append(sep.equals("((") ? "" : "))") - .toString(); + return sb.append(sep.equals("((") ? "" : "))").toString(); } /** @@ -161,4 +163,112 @@ public final class StorageUtil { return null; } } + + /** + * Get fallback range + * + * @param range + * @param factName + * @param cubeql + * @return + * @throws LensException + */ + public static TimeRange getFallbackRange(TimeRange range, String factName, CubeQueryContext cubeql) + throws LensException { + Cube baseCube = cubeql.getBaseCube(); + ArrayList<String> tableNames = Lists.newArrayList(factName, cubeql.getCube().getName()); + if (!cubeql.getCube().getName().equals(baseCube.getName())) { + tableNames.add(baseCube.getName()); + } + String fallBackString = null; + String timedim = baseCube.getTimeDimOfPartitionColumn(range.getPartitionColumn()); + for (String tableName : tableNames) { + fallBackString = cubeql.getMetastoreClient().getTable(tableName).getParameters() + .get(MetastoreConstants.TIMEDIM_RELATION + timedim); + if (StringUtils.isNotBlank(fallBackString)) { + break; + } + } + if (StringUtils.isBlank(fallBackString)) { + return null; + } + Matcher matcher = Pattern.compile("(.*?)\\+\\[(.*?),(.*?)\\]").matcher(fallBackString.replaceAll(WSPACE, "")); + if (!matcher.matches()) { + return null; + } + DateUtil.TimeDiff diff1 = DateUtil.TimeDiff.parseFrom(matcher.group(2).trim()); + DateUtil.TimeDiff diff2 = DateUtil.TimeDiff.parseFrom(matcher.group(3).trim()); + String relatedTimeDim = matcher.group(1).trim(); + String fallbackPartCol = baseCube.getPartitionColumnOfTimeDim(relatedTimeDim); + return TimeRange.getBuilder().fromDate(diff2.negativeOffsetFrom(range.getFromDate())) + .toDate(diff1.negativeOffsetFrom(range.getToDate())).partitionColumn(fallbackPartCol).build(); + } + + /** + * Checks how much data is completed for a column. + * See this: {@link org.apache.lens.server.api.metastore.DataCompletenessChecker} + * + * @param cubeql + * @param cubeCol + * @param alias + * @param measureTag + * @param tagToMeasureOrExprMap + * @return + */ + public static boolean processCubeColForDataCompleteness(CubeQueryContext cubeql, String cubeCol, String alias, + Set<String> measureTag, Map<String, String> tagToMeasureOrExprMap) { + CubeMeasure column = cubeql.getCube().getMeasureByName(cubeCol); + if (column != null && column.getTags() != null) { + String dataCompletenessTag = column.getTags().get(MetastoreConstants.MEASURE_DATACOMPLETENESS_TAG); + //Checking if dataCompletenessTag is set for queried measure + if (dataCompletenessTag != null) { + measureTag.add(dataCompletenessTag); + String value = tagToMeasureOrExprMap.get(dataCompletenessTag); + if (value == null) { + tagToMeasureOrExprMap.put(dataCompletenessTag, alias); + } else { + value = value.concat(",").concat(alias); + tagToMeasureOrExprMap.put(dataCompletenessTag, value); + } + return true; + } + } + return false; + } + + /** + * This method extracts all the columns used in expressions (used in query) and evaluates each + * column separately for completeness + * + * @param cubeql + * @param measureTag + * @param tagToMeasureOrExprMap + */ + public static void processExpressionsForCompleteness(CubeQueryContext cubeql, Set<String> measureTag, + Map<String, String> tagToMeasureOrExprMap) { + boolean isExprProcessed; + String cubeAlias = cubeql.getAliasForTableName(cubeql.getCube().getName()); + for (String expr : cubeql.getQueriedExprsWithMeasures()) { + isExprProcessed = false; + for (ExpressionResolver.ExprSpecContext esc : cubeql.getExprCtx().getExpressionContext(expr, cubeAlias) + .getAllExprs()) { + if (esc.getTblAliasToColumns().get(cubeAlias) != null) { + for (String cubeCol : esc.getTblAliasToColumns().get(cubeAlias)) { + if (processCubeColForDataCompleteness(cubeql, cubeCol, expr, measureTag, tagToMeasureOrExprMap)) { + /* This is done to associate the expression with one of the dataCompletenessTag for the measures. + So, even if the expression is composed of measures with different dataCompletenessTags, we will be + determining the dataCompleteness from one of the measure and this expression is grouped with the + other queried measures that have the same dataCompletenessTag. */ + isExprProcessed = true; + break; + } + } + } + if (isExprProcessed) { + break; + } + } + } + } } + http://git-wip-us.apache.org/repos/asf/lens/blob/ae83caae/lens-cube/src/main/java/org/apache/lens/cube/parse/TimeRangeChecker.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/TimeRangeChecker.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/TimeRangeChecker.java deleted file mode 100644 index f18ae36..0000000 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/TimeRangeChecker.java +++ /dev/null @@ -1,240 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.lens.cube.parse; - -import static org.apache.hadoop.hive.ql.parse.HiveParser.*; - -import java.util.*; - -import org.apache.lens.cube.error.ColUnAvailableInTimeRange; -import org.apache.lens.cube.error.ColUnAvailableInTimeRangeException; -import org.apache.lens.cube.error.LensCubeErrorCode; -import org.apache.lens.cube.metadata.*; -import org.apache.lens.cube.metadata.join.JoinPath; -import org.apache.lens.cube.parse.join.AutoJoinContext; -import org.apache.lens.server.api.LensConfConstants; -import org.apache.lens.server.api.error.LensException; - -import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.parse.ASTNode; -import org.apache.hadoop.hive.ql.plan.PlanUtils; - -import com.google.common.collect.Lists; -import lombok.extern.slf4j.Slf4j; - -@Slf4j -public class TimeRangeChecker implements ContextRewriter { - public TimeRangeChecker(Configuration conf) { - } - @Override - public void rewriteContext(CubeQueryContext cubeql) throws LensException { - if (cubeql.getCube() == null) { - return; - } - doColLifeValidation(cubeql); - doFactRangeValidation(cubeql); - } - private void extractTimeRange(CubeQueryContext cubeql) throws LensException { - // get time range - - // Time range should be direct child of where condition - // TOK_WHERE.TOK_FUNCTION.Identifier Or, it should be right hand child of - // AND condition TOK_WHERE.KW_AND.TOK_FUNCTION.Identifier - if (cubeql.getWhereAST() == null || cubeql.getWhereAST().getChildCount() < 1) { - throw new LensException(LensCubeErrorCode.NO_TIMERANGE_FILTER.getLensErrorInfo()); - } - searchTimeRanges(cubeql.getWhereAST(), cubeql, null, 0); - } - - private void searchTimeRanges(ASTNode root, CubeQueryContext cubeql, ASTNode parent, int childIndex) - throws LensException { - if (root == null) { - return; - } else if (root.getToken().getType() == TOK_FUNCTION) { - ASTNode fname = HQLParser.findNodeByPath(root, Identifier); - if (fname != null && CubeQueryContext.TIME_RANGE_FUNC.equalsIgnoreCase(fname.getText())) { - processTimeRangeFunction(cubeql, root, parent, childIndex); - } - } else { - for (int i = 0; i < root.getChildCount(); i++) { - ASTNode child = (ASTNode) root.getChild(i); - searchTimeRanges(child, cubeql, root, i); - } - } - } - - private String getColumnName(ASTNode node) { - String column = null; - if (node.getToken().getType() == DOT) { - ASTNode colIdent = (ASTNode) node.getChild(1); - column = colIdent.getText().toLowerCase(); - } else if (node.getToken().getType() == TOK_TABLE_OR_COL) { - // Take child ident.totext - ASTNode ident = (ASTNode) node.getChild(0); - column = ident.getText().toLowerCase(); - } - return column; - } - - private void processTimeRangeFunction(CubeQueryContext cubeql, ASTNode timenode, ASTNode parent, int childIndex) - throws LensException { - TimeRange.TimeRangeBuilder builder = TimeRange.getBuilder(); - builder.astNode(timenode); - builder.parent(parent); - builder.childIndex(childIndex); - - String timeDimName = getColumnName((ASTNode) timenode.getChild(1)); - - if (!cubeql.getCube().getTimedDimensions().contains(timeDimName)) { - throw new LensException(LensCubeErrorCode.NOT_A_TIMED_DIMENSION.getLensErrorInfo(), timeDimName); - } - // Replace timeDimName with column which is used for partitioning. Assume - // the same column - // is used as a partition column in all storages of the fact - timeDimName = cubeql.getPartitionColumnOfTimeDim(timeDimName); - builder.partitionColumn(timeDimName); - - String fromDateRaw = PlanUtils.stripQuotes(timenode.getChild(2).getText()); - String toDateRaw = null; - if (timenode.getChildCount() > 3) { - ASTNode toDateNode = (ASTNode) timenode.getChild(3); - if (toDateNode != null) { - toDateRaw = PlanUtils.stripQuotes(timenode.getChild(3).getText()); - } - } - long currentTime = cubeql.getConf().getLong(LensConfConstants.QUERY_CURRENT_TIME_IN_MILLIS, 0); - Date now; - if (currentTime != 0) { - now = new Date(currentTime); - } else { - now = new Date(); - } - builder.fromDate(DateUtil.resolveDate(fromDateRaw, now)); - if (StringUtils.isNotBlank(toDateRaw)) { - builder.toDate(DateUtil.resolveDate(toDateRaw, now)); - } else { - builder.toDate(now); - } - - TimeRange range = builder.build(); - range.validate(); - cubeql.getTimeRanges().add(range); - } - - private void doColLifeValidation(CubeQueryContext cubeql) throws LensException, - ColUnAvailableInTimeRangeException { - Set<String> cubeColumns = cubeql.getColumnsQueriedForTable(cubeql.getCube().getName()); - if (cubeColumns == null || cubeColumns.isEmpty()) { - // Query doesn't have any columns from cube - return; - } - - for (String col : cubeql.getColumnsQueriedForTable(cubeql.getCube().getName())) { - CubeColumn column = cubeql.getCube().getColumnByName(col); - for (TimeRange range : cubeql.getTimeRanges()) { - if (column == null) { - if (!cubeql.getCube().getTimedDimensions().contains(col)) { - throw new LensException(LensCubeErrorCode.NOT_A_CUBE_COLUMN.getLensErrorInfo(), col); - } - continue; - } - if (!column.isColumnAvailableInTimeRange(range)) { - throwException(column); - } - } - } - - // Remove join paths that have columns with invalid life span - AutoJoinContext joinContext = cubeql.getAutoJoinCtx(); - if (joinContext == null) { - return; - } - // Get cube columns which are part of join chain - Set<String> joinColumns = joinContext.getAllJoinPathColumnsOfTable((AbstractCubeTable) cubeql.getCube()); - if (joinColumns == null || joinColumns.isEmpty()) { - return; - } - - // Loop over all cube columns part of join paths - for (String col : joinColumns) { - CubeColumn column = cubeql.getCube().getColumnByName(col); - for (TimeRange range : cubeql.getTimeRanges()) { - if (!column.isColumnAvailableInTimeRange(range)) { - log.info("Timerange queried is not in column life for {}, Removing join paths containing the column", column); - // Remove join paths containing this column - Map<Aliased<Dimension>, List<JoinPath>> allPaths = joinContext.getAllPaths(); - - for (Aliased<Dimension> dimension : allPaths.keySet()) { - List<JoinPath> joinPaths = allPaths.get(dimension); - Iterator<JoinPath> joinPathIterator = joinPaths.iterator(); - - while (joinPathIterator.hasNext()) { - JoinPath path = joinPathIterator.next(); - if (path.containsColumnOfTable(col, (AbstractCubeTable) cubeql.getCube())) { - log.info("Removing join path: {} as columns :{} is not available in the range", path, col); - joinPathIterator.remove(); - if (joinPaths.isEmpty()) { - // This dimension doesn't have any paths left - throw new LensException(LensCubeErrorCode.NO_JOIN_PATH.getLensErrorInfo(), - "No valid join path available for dimension " + dimension + " which would satisfy time range " - + range.getFromDate() + "-" + range.getToDate()); - } - } - } // End loop to remove path - - } // End loop for all paths - } - } // End time range loop - } // End column loop - } - - - private void throwException(CubeColumn column) throws ColUnAvailableInTimeRangeException { - - final Long availabilityStartTime = (column.getStartTimeMillisSinceEpoch().isPresent()) - ? column.getStartTimeMillisSinceEpoch().get() : null; - - final Long availabilityEndTime = column.getEndTimeMillisSinceEpoch().isPresent() - ? column.getEndTimeMillisSinceEpoch().get() : null; - - ColUnAvailableInTimeRange col = new ColUnAvailableInTimeRange(column.getName(), availabilityStartTime, - availabilityEndTime); - - throw new ColUnAvailableInTimeRangeException(col); - } - - private void doFactRangeValidation(CubeQueryContext cubeql) { - Iterator<CandidateFact> iter = cubeql.getCandidateFacts().iterator(); - while (iter.hasNext()) { - CandidateFact cfact = iter.next(); - List<TimeRange> invalidTimeRanges = Lists.newArrayList(); - for (TimeRange timeRange : cubeql.getTimeRanges()) { - if (!cfact.isValidForTimeRange(timeRange)) { - invalidTimeRanges.add(timeRange); - } - } - if (!invalidTimeRanges.isEmpty()){ - cubeql.addFactPruningMsgs(cfact.fact, CandidateTablePruneCause.factNotAvailableInRange(invalidTimeRanges)); - log.info("Not considering {} as it's not available for time ranges: {}", cfact, invalidTimeRanges); - iter.remove(); - } - } - cubeql.pruneCandidateFactSet(CandidateTablePruneCause.CandidateTablePruneCode.FACT_NOT_AVAILABLE_IN_RANGE); - } -} http://git-wip-us.apache.org/repos/asf/lens/blob/ae83caae/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionCandidate.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionCandidate.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionCandidate.java new file mode 100644 index 0000000..7f07dbc --- /dev/null +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionCandidate.java @@ -0,0 +1,295 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.cube.parse; + +import java.util.*; + +import org.apache.lens.cube.metadata.FactPartition; +import org.apache.lens.cube.metadata.TimeRange; +import org.apache.lens.server.api.error.LensException; + +import com.google.common.collect.Maps; + +/** + * Represents a union of two candidates + */ +public class UnionCandidate implements Candidate { + + /** + * Caching start and end time calculated for this candidate as it may have many child candidates. + */ + Date startTime = null; + Date endTime = null; + String toStr; + CubeQueryContext cubeql; + /** + * List of child candidates that will be union-ed + */ + private List<Candidate> childCandidates; + private QueryAST queryAst; + private Map<TimeRange, Map<Candidate, TimeRange>> splitTimeRangeMap = Maps.newHashMap(); + public UnionCandidate(List<Candidate> childCandidates, CubeQueryContext cubeql) { + this.childCandidates = childCandidates; + //this.alias = alias; + this.cubeql = cubeql; + } + + @Override + public Set<Integer> getAnswerableMeasurePhraseIndices() { + // All children in the UnionCandiate will be having common quriable measure + return getChildren().iterator().next().getAnswerableMeasurePhraseIndices(); + } + + @Override + public boolean isTimeRangeCoverable(TimeRange timeRange) throws LensException { + Map<Candidate, TimeRange> candidateRange = getTimeRangeSplit(timeRange); + for (Map.Entry<Candidate, TimeRange> entry : candidateRange.entrySet()) { + if (!entry.getKey().isTimeRangeCoverable(entry.getValue())) { + return false; + } + } + return true; + } + + @Override + public Collection<String> getColumns() { + // In UnionCandidate all columns are same, return the columns + // of first child + return childCandidates.iterator().next().getColumns(); + } + + @Override + public Date getStartTime() { + //Note: concurrent calls not handled specifically (This should not be a problem even if we do + //get concurrent calls). + + if (startTime == null) { + Date minStartTime = childCandidates.get(0).getStartTime(); + for (Candidate child : childCandidates) { + if (child.getStartTime().before(minStartTime)) { + minStartTime = child.getStartTime(); + } + } + startTime = minStartTime; + } + return startTime; + } + + @Override + public Date getEndTime() { + if (endTime == null) { + Date maxEndTime = childCandidates.get(0).getEndTime(); + for (Candidate child : childCandidates) { + if (child.getEndTime().after(maxEndTime)) { + maxEndTime = child.getEndTime(); + } + } + endTime = maxEndTime; + } + return endTime; + } + + @Override + public double getCost() { + double cost = 0.0; + for (TimeRange timeRange : cubeql.getTimeRanges()) { + for (Map.Entry<Candidate, TimeRange> entry : getTimeRangeSplit(timeRange).entrySet()) { + cost += entry.getKey().getCost() * entry.getValue().milliseconds() / timeRange.milliseconds(); + } + } + return cost; + } + + @Override + public boolean contains(Candidate candidate) { + if (this.equals(candidate)) { + return true; + } + for (Candidate child : childCandidates) { + if (child.contains((candidate))) { + return true; + } + } + return false; + } + + @Override + public Collection<Candidate> getChildren() { + return childCandidates; + } + + /** + * @param timeRange + * @return + */ + @Override + public boolean evaluateCompleteness(TimeRange timeRange, TimeRange parentTimeRange, boolean failOnPartialData) + throws LensException { + Map<Candidate, TimeRange> candidateRange = getTimeRangeSplit(timeRange); + boolean ret = true; + for (Map.Entry<Candidate, TimeRange> entry : candidateRange.entrySet()) { + ret &= entry.getKey().evaluateCompleteness(entry.getValue(), parentTimeRange, failOnPartialData); + } + return ret; + } + + @Override + public Set<FactPartition> getParticipatingPartitions() { + Set<FactPartition> factPartitionSet = new HashSet<>(); + for (Candidate c : childCandidates) { + factPartitionSet.addAll(c.getParticipatingPartitions()); + } + return factPartitionSet; + } + + @Override + public boolean isExpressionEvaluable(ExpressionResolver.ExpressionContext expr) { + for (Candidate cand : childCandidates) { + if (!cand.isExpressionEvaluable(expr)) { + return false; + } + } + return true; + } + + @Override + public String toString() { + if (this.toStr == null) { + this.toStr = getToString(); + } + return this.toStr; + } + + private String getToString() { + StringBuilder builder = new StringBuilder(10 * childCandidates.size()); + builder.append("UNION["); + for (Candidate candidate : childCandidates) { + builder.append(candidate.toString()); + builder.append(", "); + } + builder.delete(builder.length() - 2, builder.length()); + builder.append("]"); + return builder.toString(); + } + + /** + * Splits the parent time range for each candidate. + * The candidates are sorted based on their costs. + * + * @param timeRange + * @return + */ + private Map<Candidate, TimeRange> splitTimeRangeForChildren(TimeRange timeRange) { + childCandidates.sort(Comparator.comparing(Candidate::getCost)); + Map<Candidate, TimeRange> childrenTimeRangeMap = new HashMap<>(); + // Sorted list based on the weights. + Set<TimeRange> ranges = new HashSet<>(); + ranges.add(timeRange); + for (Candidate c : childCandidates) { + TimeRange.TimeRangeBuilder builder = getClonedBuiler(timeRange); + TimeRange tr = resolveTimeRangeForChildren(c, ranges, builder); + if (tr != null) { + // If the time range is not null it means this child candidate is valid for this union candidate. + childrenTimeRangeMap.put(c, tr); + } + } + return childrenTimeRangeMap; + } + private Map<Candidate, TimeRange> getTimeRangeSplit(TimeRange range) { + return splitTimeRangeMap.computeIfAbsent(range, this::splitTimeRangeForChildren); + } + + /** + * Resolves the time range for this candidate based on overlap. + * + * @param candidate : Candidate for which the time range is to be calculated + * @param ranges : Set of time ranges from which one has to be choosen. + * @param builder : TimeRange builder created by the common AST. + * @return Calculated timeRange for the candidate. If it returns null then there is no suitable time range split for + * this candidate. This is the correct behaviour because an union candidate can have non participating child + * candidates for the parent time range. + */ + private TimeRange resolveTimeRangeForChildren(Candidate candidate, Set<TimeRange> ranges, + TimeRange.TimeRangeBuilder builder) { + Iterator<TimeRange> it = ranges.iterator(); + Set<TimeRange> newTimeRanges = new HashSet<>(); + TimeRange ret = null; + while (it.hasNext()) { + TimeRange range = it.next(); + // Check for out of range + if (candidate.getStartTime().getTime() >= range.getToDate().getTime() || candidate.getEndTime().getTime() <= range + .getFromDate().getTime()) { + continue; + } + // This means overlap. + if (candidate.getStartTime().getTime() <= range.getFromDate().getTime()) { + // Start time of the new time range will be range.getFromDate() + builder.fromDate(range.getFromDate()); + if (candidate.getEndTime().getTime() <= range.getToDate().getTime()) { + // End time is in the middle of the range is equal to c.getEndTime(). + builder.toDate(candidate.getEndTime()); + } else { + // End time will be range.getToDate() + builder.toDate(range.getToDate()); + } + } else { + builder.fromDate(candidate.getStartTime()); + if (candidate.getEndTime().getTime() <= range.getToDate().getTime()) { + builder.toDate(candidate.getEndTime()); + } else { + builder.toDate(range.getToDate()); + } + } + // Remove the time range and add more time ranges. + it.remove(); + ret = builder.build(); + if (ret.getFromDate().getTime() == range.getFromDate().getTime()) { + checkAndUpdateNewTimeRanges(ret, range, newTimeRanges); + } else { + TimeRange.TimeRangeBuilder b1 = getClonedBuiler(ret); + b1.fromDate(range.getFromDate()); + b1.toDate(ret.getFromDate()); + newTimeRanges.add(b1.build()); + checkAndUpdateNewTimeRanges(ret, range, newTimeRanges); + + } + break; + } + ranges.addAll(newTimeRanges); + return ret; + } + + private void checkAndUpdateNewTimeRanges(TimeRange ret, TimeRange range, Set<TimeRange> newTimeRanges) { + if (ret.getToDate().getTime() < range.getToDate().getTime()) { + TimeRange.TimeRangeBuilder b2 = getClonedBuiler(ret); + b2.fromDate(ret.getToDate()); + b2.toDate(range.getToDate()); + newTimeRanges.add(b2.build()); + } + } + + private TimeRange.TimeRangeBuilder getClonedBuiler(TimeRange timeRange) { + TimeRange.TimeRangeBuilder builder = new TimeRange.TimeRangeBuilder(); + builder.astNode(timeRange.getAstNode()); + builder.childIndex(timeRange.getChildIndex()); + builder.parent(timeRange.getParent()); + builder.partitionColumn(timeRange.getPartitionColumn()); + return builder; + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/ae83caae/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionHQLContext.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionHQLContext.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionHQLContext.java deleted file mode 100644 index e6ee989..0000000 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/UnionHQLContext.java +++ /dev/null @@ -1,55 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.lens.cube.parse; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.lens.server.api.error.LensException; - -import org.apache.commons.lang.NotImplementedException; - -import lombok.AllArgsConstructor; -import lombok.RequiredArgsConstructor; - -@AllArgsConstructor -@RequiredArgsConstructor -public abstract class UnionHQLContext extends SimpleHQLContext { - protected final CubeQueryContext query; - protected final CandidateFact fact; - - List<HQLContextInterface> hqlContexts = new ArrayList<>(); - - public void setHqlContexts(List<HQLContextInterface> hqlContexts) throws LensException { - this.hqlContexts = hqlContexts; - StringBuilder queryParts = new StringBuilder("("); - String sep = ""; - for (HQLContextInterface ctx : hqlContexts) { - queryParts.append(sep).append(ctx.toHQL()); - sep = " UNION ALL "; - } - setFrom(queryParts.append(") ").append(query.getCube().getName()).toString()); - } - - @Override - public String getWhere() { - throw new NotImplementedException("Not Implemented"); - } -}