http://git-wip-us.apache.org/repos/asf/lens/blob/b58749e2/lens-cube/src/main/java/org/apache/lens/cube/parse/SegmentationCandidate.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/SegmentationCandidate.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/SegmentationCandidate.java new file mode 100644 index 0000000..a359d86 --- /dev/null +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/SegmentationCandidate.java @@ -0,0 +1,382 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.cube.parse; + +import static java.util.stream.Collectors.joining; +import static java.util.stream.Collectors.toMap; + +import static org.apache.lens.cube.metadata.DateUtil.formatAbsDate; +import static org.apache.lens.cube.metadata.MetastoreUtil.getStringLiteralAST; + +import static org.apache.hadoop.hive.ql.parse.HiveParser.Identifier; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_FROM; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_HAVING; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_INSERT; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_ORDERBY; +import static org.apache.hadoop.hive.ql.parse.HiveParser.TOK_SELEXPR; + +import java.util.Collection; +import java.util.Comparator; +import java.util.Date; +import java.util.Map; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; +import java.util.function.Predicate; +import java.util.stream.Collector; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import org.apache.lens.cube.metadata.Cube; +import org.apache.lens.cube.metadata.CubeColumn; +import org.apache.lens.cube.metadata.CubeInterface; +import org.apache.lens.cube.metadata.FactPartition; +import org.apache.lens.cube.metadata.MetastoreUtil; +import org.apache.lens.cube.metadata.Segment; +import org.apache.lens.cube.metadata.Segmentation; +import org.apache.lens.cube.metadata.TimeRange; +import org.apache.lens.server.api.error.LensException; + +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; +import org.apache.hadoop.hive.ql.parse.ASTNode; + +import org.antlr.runtime.CommonToken; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import lombok.Getter; + +/** + * Created on 09/03/17. + */ +public class SegmentationCandidate implements Candidate { + + Collection<String> columns; + @Getter + private final CubeQueryContext cubeQueryContext; + private Segmentation segmentation; + private Map<String, Cube> cubesOfSegmentation; + Map<String, CubeQueryContext> cubeQueryContextMap; + @Getter + private final Set<Integer> answerableMeasurePhraseIndices = Sets.newHashSet(); + private Map<TimeRange, TimeRange> queriedRangeToMyRange = Maps.newHashMap(); + + SegmentationCandidate(CubeQueryContext cubeQueryContext, Segmentation segmentation) throws LensException { + this.cubeQueryContext = cubeQueryContext; + this.segmentation = segmentation; + cubesOfSegmentation = Maps.newHashMap(); + cubeQueryContextMap = Maps.newHashMap(); + for (Segment segment : segmentation.getSegments()) { + // assuming only base cubes in segmentation + cubesOfSegmentation.put(segment.getName(), (Cube) getCubeMetastoreClient().getCube(segment.getName())); + } + } + + + public SegmentationCandidate explode() throws LensException { + return this; + } + + private static <T> Predicate<T> not(Predicate<T> predicate) { + return predicate.negate(); + } + + boolean rewriteInternal(Configuration conf, HiveConf hconf) throws LensException { + CubeInterface cube = getCube(); + if (cube == null) { + return false; + } + for (Segment segment : segmentation.getSegments()) { + // assuming only base cubes in segmentation + Cube innerCube = (Cube) getCubeMetastoreClient().getCube(segment.getName()); + cubesOfSegmentation.put(segment.getName(), innerCube); + Set<QueriedPhraseContext> notAnswerable = cubeQueryContext.getQueriedPhrases().stream() + .filter(not(this::isPhraseAnswerable)).collect(Collectors.toSet()); + // create ast + ASTNode ast = MetastoreUtil.copyAST(cubeQueryContext.getAst(), + astNode -> { + // replace time range + for (Map.Entry<TimeRange, TimeRange> timeRangeTimeRangeEntry : queriedRangeToMyRange.entrySet()) { + TimeRange queriedTimeRange = timeRangeTimeRangeEntry.getKey(); + TimeRange timeRange = timeRangeTimeRangeEntry.getValue(); + if (astNode.getParent() == queriedTimeRange.getAstNode()) { + if (astNode.getChildIndex() == 2) { + return Pair.of(getStringLiteralAST(formatAbsDate(timeRange.getFromDate())), false); + } else if (astNode.getChildIndex() == 3) { + return Pair.of(getStringLiteralAST(formatAbsDate(timeRange.getToDate())), false); + } + break; + } + } + // else, replace unanswerable measures + for (QueriedPhraseContext phraseContext : notAnswerable) { + if ((astNode.getType() != TOK_SELEXPR && astNode == phraseContext.getExprAST()) + || astNode.getParent() == phraseContext.getExprAST()) { + return Pair.of(MetastoreUtil.copyAST(UnionQueryWriter.DEFAULT_MEASURE_AST), false); + } + } + // else, copy token replacing cube name and ask for recursion on child nodes + // this is hard copy. Default is soft copy, which is new ASTNode(astNode) + // Soft copy retains the token object inside it, hard copy copies token object + return Pair.of(new ASTNode(new CommonToken(astNode.getToken())), true); + }); + addCubeNameAndAlias(ast, innerCube); + trimHavingAndOrderby(ast, innerCube); + CubeQueryRewriter rewriter = new CubeQueryRewriter(conf, hconf); + CubeQueryContext ctx = rewriter.rewrite(ast); + cubeQueryContextMap.put(segment.getName(), ctx); + if (!ctx.getCandidates().isEmpty()) { + ctx.pickCandidateToQuery(); + for (StorageCandidate storageCandidate : CandidateUtil.getStorageCandidates(ctx.getPickedCandidate())) { + for (Map.Entry<TimeRange, TimeRange> timeRangeTimeRangeEntry : queriedRangeToMyRange.entrySet()) { + TimeRange timeRange = timeRangeTimeRangeEntry.getKey(); + TimeRange queriedTimeRange = timeRangeTimeRangeEntry.getValue(); + Set<FactPartition> rangeToPartition = storageCandidate.getRangeToPartitions().get(timeRange); + if (rangeToPartition != null) { + storageCandidate.getRangeToPartitions().put(queriedTimeRange, rangeToPartition); + } + String extraWhere = storageCandidate.getRangeToExtraWhereFallBack().get(timeRange); + if (extraWhere != null) { + storageCandidate.getRangeToExtraWhereFallBack().put(queriedTimeRange, extraWhere); + } + } + } + } + } + return areCandidatesPicked(); + } + + private void addCubeNameAndAlias(ASTNode ast, Cube innerCube) { + ASTNode cubeNameNode = findCubeNameNode(HQLParser.findNodeByPath(ast, TOK_FROM)); + assert cubeNameNode != null; + ASTNode tabrefNode = (ASTNode) cubeNameNode.getParent().getParent(); + cubeNameNode.getToken().setText(innerCube.getName()); + ASTNode aliasNode = new ASTNode(new CommonToken(Identifier, + getCubeQueryContext().getAliasForTableName(getCube().getName()))); + if (tabrefNode.getChildCount() > 1) { + tabrefNode.setChild(1, aliasNode); + } else { + tabrefNode.addChild(aliasNode); + } + } + + private ASTNode findCubeNameNode(ASTNode node) { + if (node.getType() == Identifier) { + if (node.getText().equalsIgnoreCase(getCubeQueryContext().getCube().getName())) { + return node; + } else { + return null; // should never come here. + } + } + return node.getChildren().stream().map(ASTNode.class::cast).map(this::findCubeNameNode).filter(Objects::nonNull) + .findFirst().orElse(null); + } + + private void trimHavingAndOrderby(ASTNode ast, Cube innerCube) { + ASTNode havingAst = HQLParser.findNodeByPath(ast, TOK_INSERT, TOK_HAVING); + if (havingAst != null) { + ASTNode newHavingAst = HQLParser.trimHavingAst(havingAst, innerCube.getAllFieldNames()); + if (newHavingAst != null) { + havingAst.getParent().setChild(havingAst.getChildIndex(), newHavingAst); + } else { + havingAst.getParent().deleteChild(havingAst.getChildIndex()); + } + } + ASTNode orderByAst = HQLParser.findNodeByPath(ast, TOK_INSERT, TOK_ORDERBY); + if (orderByAst != null) { + ASTNode newOrderByAst = HQLParser.trimOrderByAst(orderByAst, innerCube.getAllFieldNames()); + if (newOrderByAst != null) { + orderByAst.getParent().setChild(orderByAst.getChildIndex(), newOrderByAst); + } else { + orderByAst.getParent().deleteChild(orderByAst.getChildIndex()); + } + } + } + + + public SegmentationCandidate(SegmentationCandidate segmentationCandidate) throws LensException { + this(segmentationCandidate.cubeQueryContext, segmentationCandidate.segmentation); + + } + + @Override + public Collection<String> getColumns() { + if (columns == null) { + columns = cubeStream().map(Cube::getAllFieldNames) + .reduce(Sets::intersection).orElseGet(Sets::newHashSet) + .stream().collect(Collectors.toSet()); + } + return columns; + } + + @Override + public Date getStartTime() { + return segmentation.getStartTime(); + } + + @Override + public Date getEndTime() { + return segmentation.getEndTime(); + } + + @Override + public double getCost() { + return segmentation.weight(); + } + + @Override + public boolean contains(Candidate candidate) { + return areCandidatesPicked() && getChildren().contains(candidate); + } + + @Override + public Collection<Candidate> getChildren() { + return candidateStream().collect(Collectors.toSet()); + } + + @Override + public boolean isTimeRangeCoverable(TimeRange timeRange) throws LensException { + return true; + } + + @Override + public boolean evaluateCompleteness(TimeRange timeRange, TimeRange queriedTimeRange, boolean failOnPartialData) + throws LensException { + queriedRangeToMyRange.put(queriedTimeRange, timeRange); + return true; + } + + @Override + public Set<FactPartition> getParticipatingPartitions() { + Set<FactPartition> partitionSet = Sets.newHashSet(); + for (CubeQueryContext cubeQueryContext : cubeQueryContextMap.values()) { + if (cubeQueryContext.getPickedCandidate() != null) { + partitionSet.addAll(cubeQueryContext.getPickedCandidate().getParticipatingPartitions()); + } + } + return partitionSet; + } + + @Override + public boolean isExpressionEvaluable(ExpressionResolver.ExpressionContext expr) { + // expression context is specific to cubequerycontext. So for segmentation candidate, + // I can't ask my children to check this context for evaluability. + return cubeStream() + .map(cube -> cube.getExpressionByName(expr.getExprCol().getName())) + .allMatch(Predicate.isEqual(expr.getExprCol())); + } + + private boolean areCandidatesPicked() { + return candidateStream().count() == cubesOfSegmentation.size(); + } + + private Stream<Candidate> candidateStream() { + return contextStream().map(CubeQueryContext::getPickedCandidate).filter(Objects::nonNull); + } + + private Stream<CubeQueryContext> contextStream() { + return cubeQueryContextMap.values().stream(); + } + + private Stream<Cube> cubeStream() { + return cubesOfSegmentation.values().stream(); + } + + @Override + public boolean isExpressionEvaluable(String expr) { + return candidateStream().allMatch(cand -> cand.isExpressionEvaluable(expr)); + } + + @Override + public boolean isDimAttributeEvaluable(String dim) throws LensException { + if (areCandidatesPicked()) { + for (Candidate childCandidate : (Iterable<Candidate>) candidateStream()::iterator) { + if (!childCandidate.isDimAttributeEvaluable(dim)) { + return false; + } + } + return true; + } + return hasColumn(dim); + } + + @Override + public Candidate copy() throws LensException { + return new SegmentationCandidate(this); + } + + @Override + public boolean isPhraseAnswerable(QueriedPhraseContext phrase) { + // TODO consider measure start time etc + return getColumns().containsAll(phrase.getColumns()); + } + + @Override + public Optional<Date> getColumnStartTime(String column) { + if (areCandidatesPicked()) { + return candidateStream() + .map(c -> c.getColumnStartTime(column)) + .filter(Optional::isPresent) + .map(Optional::get) + .min(Comparator.naturalOrder()); + } else { + return cubeStream() + .map(cube -> cube.getColumnByName(column)) + .map(CubeColumn::getStartTime).filter(Objects::nonNull) + .min(Comparator.naturalOrder()); + } + } + + @Override + public Optional<Date> getColumnEndTime(String column) { + if (areCandidatesPicked()) { + return candidateStream() + .map(c -> c.getColumnEndTime(column)) + .filter(Optional::isPresent) // use flatmap(Optional::stream) after migration to java9 + .map(Optional::get) // https://bugs.openjdk.java.net/browse/JDK-8050820 + .max(Comparator.naturalOrder()); + } else { + return cubeStream() + .map(cube -> cube.getColumnByName(column)) + .map(CubeColumn::getEndTime).filter(Objects::nonNull) + .max(Comparator.naturalOrder()); + } + } + + public void addAnswerableMeasurePhraseIndices(int index) { + answerableMeasurePhraseIndices.add(index); + } + + + public String toString() { + Collector<CharSequence, ?, String> collector = joining("; ", "SEG[", "]"); + if (areCandidatesPicked()) { + return candidateStream().map(Candidate::toString).collect(collector); + } else { + return cubeStream().map(Cube::getName).collect(collector); + } + } + + Map<String, PruneCauses<Candidate>> getPruneCausesOfFailedContexts() { + return cubeQueryContextMap.entrySet().stream().filter(entry -> entry.getValue().getPickedCandidate() == null) + .collect(toMap(Map.Entry::getKey, entry -> entry.getValue().getStoragePruningMsgs())); + } +}
http://git-wip-us.apache.org/repos/asf/lens/blob/b58749e2/lens-cube/src/main/java/org/apache/lens/cube/parse/SegmentationInnerRewriter.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/SegmentationInnerRewriter.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/SegmentationInnerRewriter.java new file mode 100644 index 0000000..b6ef3dc --- /dev/null +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/SegmentationInnerRewriter.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.cube.parse; + +import static org.apache.lens.cube.parse.CandidateTablePruneCause.segmentationPruned; + +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.lens.server.api.error.LensException; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hive.conf.HiveConf; + +import lombok.RequiredArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +/** + * Created on 11/04/17. + */ +@RequiredArgsConstructor +@Slf4j +public class SegmentationInnerRewriter implements ContextRewriter { + private final Configuration conf; + private final HiveConf hconf; + + @Override + public void rewriteContext(CubeQueryContext cubeql) throws LensException { + Exploder exploder = new Exploder(cubeql); + cubeql.getCandidates().removeIf(exploder::shouldBeRemoved); + } + @RequiredArgsConstructor + private class Exploder { + private final CubeQueryContext cubeql; + private boolean shouldBeRemoved(Candidate candidate) { + if (candidate.getChildren() == null) { + return false; + } else if (candidate instanceof SegmentationCandidate) { + SegmentationCandidate segCand = ((SegmentationCandidate) candidate); + try { + boolean areCandidatsPicked = segCand.rewriteInternal(conf, hconf); + if (!areCandidatsPicked) { + Map<String, PruneCauses<Candidate>> pruneCauses = segCand.getPruneCausesOfFailedContexts(); + Map<String, String> briefCauses = pruneCauses.entrySet().stream() + .collect(Collectors.toMap(Map.Entry::getKey, entry -> entry.getValue().getBriefCause())); + log.info("Segmentation Candidates {} not picked because: {}", candidate, briefCauses); + cubeql.addCandidatePruningMsg(candidate, segmentationPruned(briefCauses)); + } + return !areCandidatsPicked; + } catch (LensException e) { + log.info("Segmentation Candidates {} not picked because: {}", candidate, e.getMessage()); + cubeql.addCandidatePruningMsg(candidate, segmentationPruned(e)); + return true; + } + } else { + return candidate.getChildren().stream().anyMatch(this::shouldBeRemoved); + } + } + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/b58749e2/lens-cube/src/main/java/org/apache/lens/cube/parse/SimpleHQLContext.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/SimpleHQLContext.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/SimpleHQLContext.java index 77ebe82..24d0cda 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/SimpleHQLContext.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/SimpleHQLContext.java @@ -18,9 +18,16 @@ */ package org.apache.lens.cube.parse; +import java.util.List; + import org.apache.lens.server.api.error.LensException; +import org.apache.commons.lang.StringUtils; + +import com.google.common.collect.Lists; +import lombok.AccessLevel; import lombok.Data; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; /** @@ -30,37 +37,12 @@ import lombok.extern.slf4j.Slf4j; */ @Slf4j @Data -public abstract class SimpleHQLContext implements HQLContextInterface { - - private String select; +public abstract class SimpleHQLContext implements QueryWriter { + private String prefix; private String from; private String where; - private String groupby; - private String orderby; - private String having; - private Integer limit; - - SimpleHQLContext() { - } - - SimpleHQLContext(String select, String from, String where, String groupby, String orderby, String having, - Integer limit) { - this.select = select; - this.from = from; - this.where = where; - this.groupby = groupby; - this.orderby = orderby; - this.having = having; - this.limit = limit; - } - - SimpleHQLContext(String select, String groupby, String orderby, String having, Integer limit) { - this.select = select; - this.groupby = groupby; - this.orderby = orderby; - this.having = having; - this.limit = limit; - } + @Getter(AccessLevel.PUBLIC) + protected final QueryAST queryAst; /** * Set all missing expressions of HQL context. @@ -70,11 +52,50 @@ public abstract class SimpleHQLContext implements HQLContextInterface { * * @throws LensException */ - protected void setMissingExpressions() throws LensException { - } + protected abstract void setMissingExpressions() throws LensException; public String toHQL() throws LensException { setMissingExpressions(); - return CandidateUtil.buildHQLString(select, from, where, groupby, orderby, having, limit); + return buildHQLString(); + } + + private static final String BASE_QUERY_FORMAT = "SELECT %s FROM %s"; + + private String buildHQLString() { + return buildHQLString(prefix, getQueryAst().getSelectString(), from, where, getQueryAst().getGroupByString(), + getQueryAst().getOrderByString(), getQueryAst().getHavingString(), getQueryAst().getLimitValue()); + } + private static String buildHQLString(String prefix, String select, String from, String where, + String groupby, String orderby, String having, Integer limit) { + StringBuilder queryFormat = new StringBuilder(); + List<String> qstrs = Lists.newArrayList(); + if (StringUtils.isNotBlank(prefix)) { + queryFormat.append("%s"); + qstrs.add(prefix); + } + queryFormat.append(BASE_QUERY_FORMAT); + qstrs.add(select); + qstrs.add(from); + if (StringUtils.isNotBlank(where)) { + queryFormat.append(" WHERE %s"); + qstrs.add(where); + } + if (StringUtils.isNotBlank(groupby)) { + queryFormat.append(" GROUP BY %s"); + qstrs.add(groupby); + } + if (StringUtils.isNotBlank(having)) { + queryFormat.append(" HAVING %s"); + qstrs.add(having); + } + if (StringUtils.isNotBlank(orderby)) { + queryFormat.append(" ORDER BY %s"); + qstrs.add(orderby); + } + if (limit != null) { + queryFormat.append(" LIMIT %s"); + qstrs.add(String.valueOf(limit)); + } + return String.format(queryFormat.toString(), qstrs.toArray(new Object[qstrs.size()])); } } http://git-wip-us.apache.org/repos/asf/lens/blob/b58749e2/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java index 95e3c95..1e54f13 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidate.java @@ -1,4 +1,4 @@ -/** +/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -18,25 +18,41 @@ */ package org.apache.lens.cube.parse; +import static java.util.Comparator.naturalOrder; + import static org.apache.lens.cube.parse.CandidateTablePruneCause.CandidateTablePruneCode; import static org.apache.lens.cube.parse.CandidateTablePruneCause.SkipUpdatePeriodCode; import static org.apache.lens.cube.parse.CandidateTablePruneCause.timeDimNotSupported; import static org.apache.lens.cube.parse.StorageUtil.getFallbackRange; -import static org.apache.lens.cube.parse.StorageUtil.joinWithAnd; import static org.apache.lens.cube.parse.StorageUtil.processCubeColForDataCompleteness; import static org.apache.lens.cube.parse.StorageUtil.processExpressionsForCompleteness; import java.text.DateFormat; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.LinkedHashMap; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.TimeZone; +import java.util.TreeSet; +import java.util.stream.Stream; import org.apache.lens.cube.metadata.AbstractCubeTable; import org.apache.lens.cube.metadata.CubeFactTable; import org.apache.lens.cube.metadata.CubeInterface; -import org.apache.lens.cube.metadata.CubeMetastoreClient; import org.apache.lens.cube.metadata.DateUtil; import org.apache.lens.cube.metadata.Dimension; import org.apache.lens.cube.metadata.FactPartition; +import org.apache.lens.cube.metadata.MetastoreConstants; import org.apache.lens.cube.metadata.MetastoreUtil; import org.apache.lens.cube.metadata.TimeRange; import org.apache.lens.cube.metadata.UpdatePeriod; @@ -44,16 +60,11 @@ import org.apache.lens.server.api.error.LensException; import org.apache.lens.server.api.metastore.DataCompletenessChecker; import org.apache.commons.lang.StringUtils; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hive.ql.lib.Node; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.parse.ASTNode; -import org.apache.hadoop.hive.ql.parse.HiveParser; import org.apache.hadoop.hive.ql.session.SessionState; -import org.antlr.runtime.CommonToken; - import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import com.google.common.collect.Sets; import lombok.Getter; import lombok.Setter; @@ -67,9 +78,8 @@ public class StorageCandidate implements Candidate, CandidateTable { // TODO union : Put comments on member variables. @Getter - private final CubeQueryContext cubeql; + private final CubeQueryContext cubeQueryContext; private final String processTimePartCol; - private final CubeMetastoreClient client; private final String completenessPartCol; private final float completenessThreshold; @@ -101,7 +111,9 @@ public class StorageCandidate implements Candidate, CandidateTable { @Getter @Setter Map<String, SkipUpdatePeriodCode> updatePeriodRejectionCause; - private Configuration conf = null; + @Getter + Set<Dimension> queriedDims = Sets.newHashSet(); + private Collection<StorageCandidate> periodSpecificStorageCandidates; /** * This map holds Tags (A tag refers to one or more measures) that have incomplete (below configured threshold) data. @@ -120,16 +132,10 @@ public class StorageCandidate implements Candidate, CandidateTable { @Getter private String storageTable; @Getter - @Setter - private QueryAST queryAst; - @Getter private Map<TimeRange, Set<FactPartition>> rangeToPartitions = new LinkedHashMap<>(); @Getter private Map<TimeRange, String> rangeToExtraWhereFallBack = new LinkedHashMap<>(); @Getter - @Setter - private String whereString; - @Getter private Set<Integer> answerableMeasurePhraseIndices = Sets.newHashSet(); @Getter @Setter @@ -137,8 +143,6 @@ public class StorageCandidate implements Candidate, CandidateTable { @Getter private CubeInterface cube; @Getter - private Map<Dimension, CandidateDim> dimsToQuery; - @Getter private Date startTime; @Getter private Date endTime; @@ -164,17 +168,11 @@ public class StorageCandidate implements Candidate, CandidateTable { private int numQueriedParts = 0; public StorageCandidate(StorageCandidate sc) throws LensException { - this(sc.getCube(), sc.getFact(), sc.getStorageName(), sc.getCubeql()); + this(sc.getCube(), sc.getFact(), sc.getStorageName(), sc.getCubeQueryContext()); this.validUpdatePeriods.addAll(sc.getValidUpdatePeriods()); - this.whereString = sc.whereString; this.fromString = sc.fromString; - this.dimsToQuery = sc.dimsToQuery; this.factColumns = sc.factColumns; this.answerableMeasurePhraseIndices.addAll(sc.answerableMeasurePhraseIndices); - if (sc.getQueryAst() != null) { - this.queryAst = new DefaultQueryAST(); - CandidateUtil.copyASTs(sc.getQueryAst(), new DefaultQueryAST()); - } for (Map.Entry<TimeRange, Set<FactPartition>> entry : sc.getRangeToPartitions().entrySet()) { rangeToPartitions.put(entry.getKey(), new LinkedHashSet<>(entry.getValue())); } @@ -182,37 +180,43 @@ public class StorageCandidate implements Candidate, CandidateTable { this.answerableMeasurePhraseIndices = sc.answerableMeasurePhraseIndices; } - public StorageCandidate(CubeInterface cube, CubeFactTable fact, String storageName, CubeQueryContext cubeql) + public StorageCandidate(CubeInterface cube, CubeFactTable fact, String storageName, CubeQueryContext cubeQueryContext) throws LensException { - if ((cube == null) || (fact == null) || (storageName == null)) { - throw new IllegalArgumentException("Cube,fact and storageName should be non null"); - } this.cube = cube; this.fact = fact; - this.cubeql = cubeql; + this.cubeQueryContext = cubeQueryContext; + if ((getCube() == null) || (fact == null) || (storageName == null)) { + throw new IllegalArgumentException("Cube,fact and storageName should be non null"); + } this.storageName = storageName; this.storageTable = MetastoreUtil.getFactOrDimtableStorageTableName(fact.getName(), storageName); - this.conf = cubeql.getConf(); - this.name = fact.getName(); - this.processTimePartCol = conf.get(CubeQueryConfUtil.PROCESS_TIME_PART_COL); - String formatStr = conf.get(CubeQueryConfUtil.PART_WHERE_CLAUSE_DATE_FORMAT); + this.name = getFact().getName(); + this.processTimePartCol = getConf().get(CubeQueryConfUtil.PROCESS_TIME_PART_COL); + String formatStr = getConf().get(CubeQueryConfUtil.PART_WHERE_CLAUSE_DATE_FORMAT); if (formatStr != null) { this.partWhereClauseFormat = new SimpleDateFormat(formatStr); } - completenessPartCol = conf.get(CubeQueryConfUtil.COMPLETENESS_CHECK_PART_COL); - completenessThreshold = conf + completenessPartCol = getConf().get(CubeQueryConfUtil.COMPLETENESS_CHECK_PART_COL); + completenessThreshold = getConf() .getFloat(CubeQueryConfUtil.COMPLETENESS_THRESHOLD, CubeQueryConfUtil.DEFAULT_COMPLETENESS_THRESHOLD); - client = cubeql.getMetastoreClient(); - Set<String> storageTblNames = client.getStorageTables(fact.getName(), storageName); - if (storageTblNames.size() > 1) { - isStorageTblsAtUpdatePeriodLevel = true; - } else { - //if this.storageTable is equal to the storage table name it implies isStorageTblsAtUpdatePeriodLevel is false - isStorageTblsAtUpdatePeriodLevel = !storageTblNames.iterator().next().equalsIgnoreCase(storageTable); - } + Set<String> storageTblNames = getCubeMetastoreClient().getStorageTables(fact.getName(), storageName); + isStorageTblsAtUpdatePeriodLevel = storageTblNames.size() > 1 + || !storageTblNames.iterator().next().equalsIgnoreCase(storageTable); setStorageStartAndEndDate(); } + String getTimeRangeWhereClasue(TimeRangeWriter rangeWriter, TimeRange range) + throws LensException { + String rangeWhere = rangeWriter.getTimeRangeWhereClause( + getCubeQueryContext(), getCubeQueryContext().getAliasForTableName(getCube().getName()), + getRangeToPartitions().get(range)); + String fallback = getRangeToExtraWhereFallBack().get(range); + if (StringUtils.isNotBlank(fallback)){ + rangeWhere = "((" + rangeWhere + ") and (" + fallback + "))"; + } + return rangeWhere; + } + /** * Sets Storage candidates start and end time based on underlying storage-tables * @@ -247,7 +251,7 @@ public class StorageCandidate implements Candidate, CandidateTable { * * @throws LensException */ - public void setStorageStartAndEndDate() throws LensException { + void setStorageStartAndEndDate() throws LensException { if (this.startTime != null && !this.isStorageTblsAtUpdatePeriodLevel) { //If the times are already set and are not dependent of update period, no point setting times again. return; @@ -255,8 +259,8 @@ public class StorageCandidate implements Candidate, CandidateTable { List<Date> startDates = new ArrayList<>(); List<Date> endDates = new ArrayList<>(); for (String storageTablePrefix : getValidStorageTableNames()) { - startDates.add(client.getStorageTableStartDate(storageTablePrefix, fact.getName())); - endDates.add(client.getStorageTableEndDate(storageTablePrefix, fact.getName())); + startDates.add(getCubeMetastoreClient().getStorageTableStartDate(storageTablePrefix, fact.getName())); + endDates.add(getCubeMetastoreClient().getStorageTableEndDate(storageTablePrefix, fact.getName())); } this.startTime = Collections.min(startDates); this.endTime = Collections.max(endDates); @@ -267,128 +271,81 @@ public class StorageCandidate implements Candidate, CandidateTable { // In this case skip invalid update periods and get storage tables only for valid ones. Set<String> uniqueStorageTables = new HashSet<>(); for (UpdatePeriod updatePeriod : validUpdatePeriods) { - uniqueStorageTables.add(client.getStorageTableName(fact.getName(), storageName, updatePeriod)); + uniqueStorageTables.add( + getCubeMetastoreClient().getStorageTableName(fact.getName(), storageName, updatePeriod) + ); } return uniqueStorageTables; } else { //Get all storage tables. - return client.getStorageTables(fact.getName(), storageName); + return getCubeMetastoreClient().getStorageTables(fact.getName(), storageName); } } - private void setMissingExpressions(Set<Dimension> queriedDims) throws LensException { - setFromString(String.format("%s", getFromTable())); - setWhereString(joinWithAnd( - genWhereClauseWithDimPartitions(whereString, queriedDims), cubeql.getConf().getBoolean( - CubeQueryConfUtil.REPLACE_TIMEDIM_WITH_PART_COL, CubeQueryConfUtil.DEFAULT_REPLACE_TIMEDIM_WITH_PART_COL) - ? getPostSelectionWhereClause() : null)); - if (cubeql.getHavingAST() != null) { - queryAst.setHavingAST(MetastoreUtil.copyAST(cubeql.getHavingAST())); - } + public void addAnswerableMeasurePhraseIndices(int index) { + answerableMeasurePhraseIndices.add(index); } - private String genWhereClauseWithDimPartitions(String originalWhere, Set<Dimension> queriedDims) { - StringBuilder whereBuf; - if (originalWhere != null) { - whereBuf = new StringBuilder(originalWhere); - } else { - whereBuf = new StringBuilder(); - } - // add where clause for all dimensions - if (cubeql != null) { - boolean added = (originalWhere != null); - for (Dimension dim : queriedDims) { - CandidateDim cdim = dimsToQuery.get(dim); - String alias = cubeql.getAliasForTableName(dim.getName()); - if (!cdim.isWhereClauseAdded() && !StringUtils.isBlank(cdim.getWhereClause())) { - appendWhereClause(whereBuf, StorageUtil.getWhereClause(cdim, alias), added); - added = true; - } - } - } - if (whereBuf.length() == 0) { - return null; + @Override + public Candidate explode() throws LensException { + if (splitAtUpdatePeriodLevelIfReq().size() > 1) { + return new UnionCandidate(splitAtUpdatePeriodLevelIfReq(), getCubeQueryContext()); + } else { + return splitAtUpdatePeriodLevelIfReq().iterator().next(); } - return whereBuf.toString(); } - private static void appendWhereClause(StringBuilder filterCondition, String whereClause, boolean hasMore) { - // Make sure we add AND only when there are already some conditions in where - // clause - if (hasMore && !filterCondition.toString().isEmpty() && !StringUtils.isBlank(whereClause)) { - filterCondition.append(" AND "); - } - - if (!StringUtils.isBlank(whereClause)) { - filterCondition.append("("); - filterCondition.append(whereClause); - filterCondition.append(")"); - } + @Override + public String getStorageString(String alias) { + return storageName + " " + alias; } - private String getPostSelectionWhereClause() throws LensException { - return null; + @Override + public AbstractCubeTable getTable() { + return fact; } - void setAnswerableMeasurePhraseIndices(int index) { - answerableMeasurePhraseIndices.add(index); + @Override + public AbstractCubeTable getBaseTable() { + return (AbstractCubeTable) cube; } - public String toHQL(Set<Dimension> queriedDims) throws LensException { - setMissingExpressions(queriedDims); - // Check if the picked candidate is a StorageCandidate and in that case - // update the selectAST with final alias. - if (this == cubeql.getPickedCandidate()) { - CandidateUtil.updateFinalAlias(queryAst.getSelectAST(), cubeql); - updateOrderByWithFinalAlias(queryAst.getOrderByAST(), queryAst.getSelectAST()); - } else { - queryAst.setHavingAST(null); - } - return CandidateUtil - .buildHQLString(queryAst.getSelectString(), fromString, whereString, queryAst.getGroupByString(), - queryAst.getOrderByString(), queryAst.getHavingString(), queryAst.getLimitValue()); - } - - /** - * Update Orderby children with final alias used in select - * - * @param orderby Order by AST - * @param select Select AST - */ - private void updateOrderByWithFinalAlias(ASTNode orderby, ASTNode select) { - if (orderby == null) { - return; - } - for (Node orderbyNode : orderby.getChildren()) { - ASTNode orderBychild = (ASTNode) orderbyNode; - for (Node selectNode : select.getChildren()) { - ASTNode selectChild = (ASTNode) selectNode; - if (selectChild.getChildCount() == 2) { - if (HQLParser.getString((ASTNode) selectChild.getChild(0)) - .equals(HQLParser.getString((ASTNode) orderBychild.getChild(0)))) { - ASTNode alias = new ASTNode((ASTNode) selectChild.getChild(1)); - orderBychild.replaceChildren(0, 0, alias); - break; - } - } - } - } + public StorageCandidate copy() throws LensException { + return new StorageCandidate(this); } @Override - public String getStorageString(String alias) { - return storageName + " " + alias; + public boolean isPhraseAnswerable(QueriedPhraseContext phrase) throws LensException { + return phrase.isEvaluable(this); } @Override - public AbstractCubeTable getTable() { - return fact; + public Optional<Date> getColumnStartTime(String column) { + Date startTime = null; + for (String key : getTable().getProperties().keySet()) { + if (key.contains(MetastoreConstants.FACT_COL_START_TIME_PFX)) { + String propCol = StringUtils.substringAfter(key, MetastoreConstants.FACT_COL_START_TIME_PFX); + if (column.equals(propCol)) { + startTime = getTable().getDateFromProperty(key, false, true); + } + } + } + return Optional.ofNullable(startTime); } @Override - public AbstractCubeTable getBaseTable() { - return (AbstractCubeTable) cube; + public Optional<Date> getColumnEndTime(String column) { + Date endTime = null; + for (String key : getTable().getProperties().keySet()) { + if (key.contains(MetastoreConstants.FACT_COL_END_TIME_PFX)) { + String propCol = StringUtils.substringAfter(key, MetastoreConstants.FACT_COL_END_TIME_PFX); + if (column.equals(propCol)) { + endTime = getTable().getDateFromProperty(key, false, true); + } + } + } + return Optional.ofNullable(endTime); } @Override @@ -419,8 +376,8 @@ public class StorageCandidate implements Candidate, CandidateTable { private void updatePartitionStorage(FactPartition part) throws LensException { try { - if (client.factPartitionExists(fact, part, storageTable)) { - part.getStorageTables().add(storageTable); + if (getCubeMetastoreClient().factPartitionExists(fact, part, storageTable)) { + part.getStorageTables().add(name); part.setFound(true); } } catch (HiveException e) { @@ -437,7 +394,7 @@ public class StorageCandidate implements Candidate, CandidateTable { * and [1 oct - 1Dec) will be answered by MONTHLY partitions. The max interavl for this query will be MONTHLY. * * 2.Prune Storgaes that do not fall in the queries time range. - * {@link CubeMetastoreClient#isStorageTableCandidateForRange(String, Date, Date)} + * {@link org.apache.lens.cube.metadata.CubeMetastoreClient#isStorageTableCandidateForRange(String, Date, Date)} * * 3. Iterate over max interavl . In out case it will give two months Oct and Nov. Find partitions for * these two months.Check validity of FactPartitions for Oct and Nov @@ -456,7 +413,7 @@ public class StorageCandidate implements Candidate, CandidateTable { if (fromDate.equals(toDate) || fromDate.after(toDate)) { return true; } - if (updatePeriods == null | updatePeriods.isEmpty()) { + if (updatePeriods == null || updatePeriods.isEmpty()) { return false; } @@ -467,7 +424,7 @@ public class StorageCandidate implements Candidate, CandidateTable { } if (maxInterval == UpdatePeriod.CONTINUOUS - && cubeql.getRangeWriter().getClass().equals(BetweenTimeRangeWriter.class)) { + && cubeQueryContext.getRangeWriter().getClass().equals(BetweenTimeRangeWriter.class)) { FactPartition part = new FactPartition(partCol, fromDate, maxInterval, null, partWhereClauseFormat); partitions.add(part); part.getStorageTables().add(storageName); @@ -479,17 +436,17 @@ public class StorageCandidate implements Candidate, CandidateTable { return true; } - if (!client.partColExists(this.getFact().getName(), storageName, partCol)) { - log.info("{} does not exist in {}", partCol, storageTable); + if (!getCubeMetastoreClient().partColExists(this.getFact().getName(), storageName, partCol)) { + log.info("{} does not exist in {}", partCol, name); return false; } Date maxIntervalStorageTblStartDate = getStorageTableStartDate(maxInterval); Date maxIntervalStorageTblEndDate = getStorageTableEndDate(maxInterval); - TreeSet<UpdatePeriod> remainingIntervals = new TreeSet<>(updatePeriods); + TreeSet<UpdatePeriod> remainingIntervals = new TreeSet<>(updatePeriods); remainingIntervals.remove(maxInterval); - if (!CandidateUtil.isCandidatePartiallyValidForTimeRange( + if (!isCandidatePartiallyValidForTimeRange( maxIntervalStorageTblStartDate, maxIntervalStorageTblEndDate, fromDate, toDate)) { //Check the time range in remainingIntervals as maxInterval is not useful return getPartitions(fromDate, toDate, partCol, partitions, remainingIntervals, @@ -505,7 +462,7 @@ public class StorageCandidate implements Candidate, CandidateTable { addNonExistingParts, failOnPartialData, missingPartitions); } - int lookAheadNumParts = conf + int lookAheadNumParts = getConf() .getInt(CubeQueryConfUtil.getLookAheadPTPartsKey(maxInterval), CubeQueryConfUtil.DEFAULT_LOOK_AHEAD_PT_PARTS); TimeRange.Iterable.Iterator iter = TimeRange.iterable(ceilFromDate, floorToDate, maxInterval, 1).iterator(); // add partitions from ceilFrom to floorTo @@ -555,10 +512,10 @@ public class StorageCandidate implements Candidate, CandidateTable { // Get partitions for look ahead process time log.debug("Looking for process time partitions between {} and {}", pdt, nextPdt); Set<FactPartition> processTimeParts = getPartitions( - TimeRange.getBuilder().fromDate(pdt).toDate(nextPdt).partitionColumn(processTimePartCol).build(), + TimeRange.builder().fromDate(pdt).toDate(nextPdt).partitionColumn(processTimePartCol).build(), newset, true, failOnPartialData, missingPartitions); log.debug("Look ahead partitions: {}", processTimeParts); - TimeRange timeRange = TimeRange.getBuilder().fromDate(dt).toDate(nextDt).build(); + TimeRange timeRange = TimeRange.builder().fromDate(dt).toDate(nextDt).build(); for (FactPartition pPart : processTimeParts) { log.debug("Looking for finer partitions in pPart: {}", pPart); for (Date date : timeRange.iterable(pPart.getPeriod(), 1)) { @@ -600,9 +557,14 @@ public class StorageCandidate implements Candidate, CandidateTable { } return getPartitions(fromDate, ceilFromDate, partCol, partitions, remainingIntervals, - addNonExistingParts, failOnPartialData, missingPartitions) - && getPartitions(floorToDate, toDate, partCol, partitions, remainingIntervals, - addNonExistingParts, failOnPartialData, missingPartitions); + addNonExistingParts, failOnPartialData, missingPartitions) + && getPartitions(floorToDate, toDate, partCol, partitions, remainingIntervals, + addNonExistingParts, failOnPartialData, missingPartitions); + } + + private boolean isCandidatePartiallyValidForTimeRange(Date startDate, Date endDate, Date fromDate, Date toDate) { + return Stream.of(startDate, fromDate).max(naturalOrder()).orElse(startDate) + .before(Stream.of(endDate, toDate).min(naturalOrder()).orElse(endDate)); } @Override @@ -611,13 +573,13 @@ public class StorageCandidate implements Candidate, CandidateTable { // Check the measure tags. if (!evaluateMeasuresCompleteness(timeRange)) { log.info("Storage candidate:{} has partitions with incomplete data: {} for given ranges: {}", this, - dataCompletenessMap, cubeql.getTimeRanges()); + dataCompletenessMap, cubeQueryContext.getTimeRanges()); if (failOnPartialData) { return false; } } PartitionRangesForPartitionColumns missingParts = new PartitionRangesForPartitionColumns(); - PruneCauses<StorageCandidate> storagePruningMsgs = cubeql.getStoragePruningMsgs(); + PruneCauses<Candidate> storagePruningMsgs = cubeQueryContext.getStoragePruningMsgs(); Set<String> unsupportedTimeDims = Sets.newHashSet(); Set<String> partColsQueried = Sets.newHashSet(); partColsQueried.add(timeRange.getPartitionColumn()); @@ -641,20 +603,23 @@ public class StorageCandidate implements Candidate, CandidateTable { TimeRange prevRange = timeRange; String sep = ""; while (rangeParts.isEmpty()) { - String timeDim = cubeql.getBaseCube().getTimeDimOfPartitionColumn(partCol); - if (partColNotSupported && !CandidateUtil.factHasColumn(getFact(), timeDim)) { - unsupportedTimeDims.add(cubeql.getBaseCube().getTimeDimOfPartitionColumn(timeRange.getPartitionColumn())); + String timeDim = cubeQueryContext.getBaseCube().getTimeDimOfPartitionColumn(partCol); + if (partColNotSupported && !getFact().hasColumn(timeDim)) { + unsupportedTimeDims.add( + cubeQueryContext.getBaseCube().getTimeDimOfPartitionColumn(timeRange.getPartitionColumn()) + ); break; } - TimeRange fallBackRange = getFallbackRange(prevRange, this.getFact().getName(), cubeql); + TimeRange fallBackRange = getFallbackRange(prevRange, this.getFact().getName(), cubeQueryContext); log.info("No partitions for range:{}. fallback range: {}", timeRange, fallBackRange); if (fallBackRange == null) { break; } partColsQueried.add(fallBackRange.getPartitionColumn()); rangeParts = getPartitions(fallBackRange, validUpdatePeriods, true, failOnPartialData, missingParts); - extraWhereClauseFallback.append(sep) - .append(prevRange.toTimeDimWhereClause(cubeql.getAliasForTableName(cubeql.getCube()), timeDim)); + extraWhereClauseFallback.append(sep).append( + prevRange.toTimeDimWhereClause(cubeQueryContext.getAliasForTableName(cubeQueryContext.getCube()), timeDim) + ); sep = " AND "; prevRange = fallBackRange; partCol = prevRange.getPartitionColumn(); @@ -668,7 +633,7 @@ public class StorageCandidate implements Candidate, CandidateTable { if (!unsupportedTimeDims.isEmpty()) { log.info("Not considering storage candidate:{} as it doesn't support time dimensions: {}", this, unsupportedTimeDims); - cubeql.addStoragePruningMsg(this, timeDimNotSupported(unsupportedTimeDims)); + cubeQueryContext.addStoragePruningMsg(this, timeDimNotSupported(unsupportedTimeDims)); return false; } Set<String> nonExistingParts = missingParts.toSet(partColsQueried); @@ -688,7 +653,7 @@ public class StorageCandidate implements Candidate, CandidateTable { @Override public Set<FactPartition> getParticipatingPartitions() { Set<FactPartition> allPartitions = new HashSet<>(numQueriedParts); - for (Set<FactPartition> rangePartitions : rangeToPartitions.values()) { + for (Set<FactPartition> rangePartitions : rangeToPartitions.values()) { allPartitions.addAll(rangePartitions); } return allPartitions; @@ -703,14 +668,14 @@ public class StorageCandidate implements Candidate, CandidateTable { Set<String> measureTag = new HashSet<>(); Map<String, String> tagToMeasureOrExprMap = new HashMap<>(); - processExpressionsForCompleteness(cubeql, measureTag, tagToMeasureOrExprMap); + processExpressionsForCompleteness(cubeQueryContext, measureTag, tagToMeasureOrExprMap); - Set<String> measures = cubeql.getQueriedMsrs(); + Set<String> measures = cubeQueryContext.getQueriedMsrs(); if (measures == null) { measures = new HashSet<>(); } for (String measure : measures) { - processCubeColForDataCompleteness(cubeql, measure, measure, measureTag, tagToMeasureOrExprMap); + processCubeColForDataCompleteness(cubeQueryContext, measure, measure, measureTag, tagToMeasureOrExprMap); } //Checking if dataCompletenessTag is set for the fact if (measureTag.isEmpty()) { @@ -718,7 +683,7 @@ public class StorageCandidate implements Candidate, CandidateTable { return true; } boolean isDataComplete = false; - DataCompletenessChecker completenessChecker = client.getCompletenessChecker(); + DataCompletenessChecker completenessChecker = getCubeMetastoreClient().getCompletenessChecker(); DateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); formatter.setTimeZone(TimeZone.getTimeZone("UTC")); if (!timeRange.getPartitionColumn().equals(completenessPartCol)) { @@ -763,46 +728,20 @@ public class StorageCandidate implements Candidate, CandidateTable { return expr.isEvaluable(this); } - /** - * Update selectAST for StorageCandidate - * 1. Delete projected select expression if it's not answerable by StorageCandidate. - * 2. Replace the queried alias with select alias if both are different in a select expr. - * - * @param cubeql - * @throws LensException - */ - - public void updateAnswerableSelectColumns(CubeQueryContext cubeql) throws LensException { - // update select AST with selected fields - int currentChild = 0; - for (int i = 0; i < cubeql.getSelectAST().getChildCount(); i++) { - ASTNode selectExpr = (ASTNode) queryAst.getSelectAST().getChild(currentChild); - Set<String> exprCols = HQLParser.getColsInExpr(cubeql.getAliasForTableName(cubeql.getCube()), selectExpr); - if (getColumns().containsAll(exprCols)) { - ASTNode aliasNode = HQLParser.findNodeByPath(selectExpr, HiveParser.Identifier); - String alias = cubeql.getSelectPhrases().get(i).getSelectAlias(); - if (aliasNode != null) { - String queryAlias = aliasNode.getText(); - if (!queryAlias.equals(alias)) { - // replace the alias node - ASTNode newAliasNode = new ASTNode(new CommonToken(HiveParser.Identifier, alias)); - queryAst.getSelectAST().getChild(currentChild) - .replaceChildren(selectExpr.getChildCount() - 1, selectExpr.getChildCount() - 1, newAliasNode); - } - } else { - // add column alias - ASTNode newAliasNode = new ASTNode(new CommonToken(HiveParser.Identifier, alias)); - queryAst.getSelectAST().getChild(currentChild).addChild(newAliasNode); - } - } else { - queryAst.getSelectAST().deleteChild(currentChild); - currentChild--; - } - currentChild++; - } + @Override + public boolean isExpressionEvaluable(String expr) { + return isExpressionEvaluable( + getCubeQueryContext().getExprCtx().getExpressionContext( + expr, getCubeQueryContext().getAliasForTableName(getBaseTable().getName())) + ); } @Override + public boolean isDimAttributeEvaluable(String dim) throws LensException { + return getCubeQueryContext().getDeNormCtx() + .addRefUsage(getCubeQueryContext(), this, dim, getCubeQueryContext().getCube().getName()); + } + @Override public boolean equals(Object obj) { if (super.equals(obj)) { return true; @@ -833,24 +772,6 @@ public class StorageCandidate implements Candidate, CandidateTable { this.validUpdatePeriods.add(updatePeriod); } - void updateFromString(CubeQueryContext query, Set<Dimension> queryDims, - Map<Dimension, CandidateDim> dimsToQuery) throws LensException { - this.dimsToQuery = dimsToQuery; - String alias = cubeql.getAliasForTableName(cubeql.getCube().getName()); - fromString = getAliasForTable(alias); - if (query.isAutoJoinResolved()) { - fromString = query.getAutoJoinCtx().getFromString(fromString, this, queryDims, dimsToQuery, query, cubeql); - } - } - - private String getFromTable() throws LensException { - if (cubeql.isAutoJoinResolved()) { - return fromString; - } else { - return cubeql.getQBFromString(this, getDimsToQuery()); - } - } - public String getAliasForTable(String alias) { String database = SessionState.get().getCurrentDatabase(); String ret; @@ -866,7 +787,8 @@ public class StorageCandidate implements Candidate, CandidateTable { } boolean isUpdatePeriodUseful(UpdatePeriod updatePeriod) { - return cubeql.getTimeRanges().stream().anyMatch(timeRange -> isUpdatePeriodUseful(timeRange, updatePeriod)); + return getCubeQueryContext().getTimeRanges().stream() + .anyMatch(timeRange -> isUpdatePeriodUseful(timeRange, updatePeriod)); } /** @@ -876,17 +798,17 @@ public class StorageCandidate implements Candidate, CandidateTable { * level or at storage or fact level. * @param timeRange The time range * @param updatePeriod Update period - * @return Whether it's useless + * @return Whether it's useless */ private boolean isUpdatePeriodUseful(TimeRange timeRange, UpdatePeriod updatePeriod) { try { - if (!CandidateUtil.isCandidatePartiallyValidForTimeRange(getStorageTableStartDate(updatePeriod), - getStorageTableEndDate(updatePeriod), timeRange.getFromDate(), timeRange.getToDate())) { + if (!timeRange.truncate(getStorageTableStartDate(updatePeriod), + getStorageTableEndDate(updatePeriod)).isValid()) { return false; } - Date storageTblStartDate = getStorageTableStartDate(updatePeriod); - Date storageTblEndDate = getStorageTableEndDate(updatePeriod); - TimeRange.getBuilder() //TODO date calculation to move to util method and resued + Date storageTblStartDate = getStorageTableStartDate(updatePeriod); + Date storageTblEndDate = getStorageTableEndDate(updatePeriod); + TimeRange.builder() //TODO date calculation to move to util method and resued .fromDate(timeRange.getFromDate().after(storageTblStartDate) ? timeRange.getFromDate() : storageTblStartDate) .toDate(timeRange.getToDate().before(storageTblEndDate) ? timeRange.getToDate() : storageTblEndDate) .partitionColumn(timeRange.getPartitionColumn()) @@ -909,7 +831,7 @@ public class StorageCandidate implements Candidate, CandidateTable { return isTimeRangeCoverable(timeRange.getFromDate(), timeRange.getToDate(), validUpdatePeriods); } - /** + /* * Is the time range coverable by given update periods. * Extracts the max update period, then extracts maximum amount of range from the middle that this update * period can cover. Then recurses on the remaining ranges on the left and right side of the extracted chunk @@ -935,7 +857,7 @@ public class StorageCandidate implements Candidate, CandidateTable { } if (maxInterval == UpdatePeriod.CONTINUOUS - && cubeql.getRangeWriter().getClass().equals(BetweenTimeRangeWriter.class)) { + && getCubeQueryContext().getRangeWriter().getClass().equals(BetweenTimeRangeWriter.class)) { return true; } @@ -943,7 +865,7 @@ public class StorageCandidate implements Candidate, CandidateTable { Date maxIntervalStorageTableEndDate = getStorageTableEndDate(maxInterval); Set<UpdatePeriod> remainingIntervals = Sets.difference(intervals, Sets.newHashSet(maxInterval)); - if (!CandidateUtil.isCandidatePartiallyValidForTimeRange( + if (!isCandidatePartiallyValidForTimeRange( maxIntervalStorageTableStartDate, maxIntervalStorageTableEndDate, timeRangeStart, timeRangeEnd)) { //Check the time range in remainingIntervals as maxInterval is not useful return isTimeRangeCoverable(timeRangeStart, timeRangeEnd, remainingIntervals); @@ -968,8 +890,8 @@ public class StorageCandidate implements Candidate, CandidateTable { //In this case the start time and end time is at Storage Level and will be same for all update periods. return this.startTime; } - return client.getStorageTableStartDate( - client.getStorageTableName(fact.getName(), storageName, interval), fact.getName()); + return getCubeMetastoreClient().getStorageTableStartDate( + getCubeMetastoreClient().getStorageTableName(fact.getName(), storageName, interval), fact.getName()); } private Date getStorageTableEndDate(UpdatePeriod interval) throws LensException { @@ -977,8 +899,8 @@ public class StorageCandidate implements Candidate, CandidateTable { //In this case the start time and end time is at Storage Level and will be same for all update periods. return this.endTime; } - return client.getStorageTableEndDate( - client.getStorageTableName(fact.getName(), storageName, interval), fact.getName()); + return getCubeMetastoreClient().getStorageTableEndDate( + getCubeMetastoreClient().getStorageTableName(fact.getName(), storageName, interval), fact.getName()); } @@ -1004,16 +926,19 @@ public class StorageCandidate implements Candidate, CandidateTable { } private Collection<StorageCandidate> getPeriodSpecificStorageCandidates() throws LensException { - List<StorageCandidate> periodSpecificScList = new ArrayList<>(participatingUpdatePeriods.size()); - StorageCandidate updatePeriodSpecificSc; - for (UpdatePeriod period : participatingUpdatePeriods) { - updatePeriodSpecificSc = new StorageCandidate(this); - updatePeriodSpecificSc.truncatePartitions(period); - updatePeriodSpecificSc.setResolvedName(client.getStorageTableName(fact.getName(), + if (periodSpecificStorageCandidates == null) { + List<StorageCandidate> periodSpecificScList = new ArrayList<>(participatingUpdatePeriods.size()); + StorageCandidate updatePeriodSpecificSc; + for (UpdatePeriod period : participatingUpdatePeriods) { + updatePeriodSpecificSc = copy(); + updatePeriodSpecificSc.truncatePartitions(period); + updatePeriodSpecificSc.setResolvedName(getCubeMetastoreClient().getStorageTableName(fact.getName(), storageName, period)); - periodSpecificScList.add(updatePeriodSpecificSc); + periodSpecificScList.add(updatePeriodSpecificSc); + } + periodSpecificStorageCandidates = periodSpecificScList; } - return periodSpecificScList; + return periodSpecificStorageCandidates; } /** @@ -1025,17 +950,18 @@ public class StorageCandidate implements Candidate, CandidateTable { Iterator<Map.Entry<TimeRange, Set<FactPartition>>> rangeItr = rangeToPartitions.entrySet().iterator(); while (rangeItr.hasNext()) { Map.Entry<TimeRange, Set<FactPartition>> rangeEntry = rangeItr.next(); - Iterator<FactPartition> partitionItr = rangeEntry.getValue().iterator(); - while (partitionItr.hasNext()) { - if (!partitionItr.next().getPeriod().equals(updatePeriod)) { - partitionItr.remove(); - } - } + rangeEntry.getValue().removeIf(factPartition -> !factPartition.getPeriod().equals(updatePeriod)); if (rangeEntry.getValue().isEmpty()) { rangeItr.remove(); } } } - + @Override + public StorageCandidateHQLContext toQueryWriterContext(Map<Dimension, CandidateDim> dimsToQuery, + CubeQueryContext rootCubeQueryContext) throws LensException { + DefaultQueryAST ast = DefaultQueryAST.fromStorageCandidate(null, getCubeQueryContext()); + ast.copyFrom(getCubeQueryContext()); + return new StorageCandidateHQLContext(this, Maps.newHashMap(dimsToQuery), ast, rootCubeQueryContext); + } } http://git-wip-us.apache.org/repos/asf/lens/blob/b58749e2/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidateHQLContext.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidateHQLContext.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidateHQLContext.java new file mode 100644 index 0000000..f5f468f --- /dev/null +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageCandidateHQLContext.java @@ -0,0 +1,164 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.lens.cube.parse; + + +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +import org.apache.lens.cube.metadata.CubeInterface; +import org.apache.lens.cube.metadata.Dimension; +import org.apache.lens.server.api.error.LensException; + +import org.apache.hadoop.hive.ql.lib.Node; +import org.apache.hadoop.hive.ql.parse.ASTNode; +import org.apache.hadoop.hive.ql.parse.HiveParser; + +import org.antlr.runtime.CommonToken; + +import lombok.Getter; + +/** + * Created on 31/03/17. + */ +public class StorageCandidateHQLContext extends DimHQLContext { + @Getter + private StorageCandidate storageCandidate; + private CubeQueryContext rootCubeQueryContext; + + StorageCandidateHQLContext(StorageCandidate storageCandidate, Map<Dimension, CandidateDim> dimsToQuery, + QueryAST ast, CubeQueryContext rootCubeQueryContext) throws LensException { + super(storageCandidate.getCubeQueryContext(), dimsToQuery, ast); + this.storageCandidate = storageCandidate; + this.rootCubeQueryContext = rootCubeQueryContext; + getCubeQueryContext().addRangeClauses(this); + if (!isRoot()) { + getQueryAst().setHavingAST(null); + } + } + private boolean isRoot() { + return Objects.equals(getCubeQueryContext(), rootCubeQueryContext) + && Objects.equals(getStorageCandidate(), getCubeQueryContext().getPickedCandidate()); + } + public CubeQueryContext getCubeQueryContext() { + return storageCandidate.getCubeQueryContext(); + } + + public void updateFromString() throws LensException { + String alias = getCubeQueryContext().getAliasForTableName(getCube().getName()); + setFrom(storageCandidate.getAliasForTable(alias)); + if (getCubeQueryContext().isAutoJoinResolved()) { + setFrom(getCubeQueryContext().getAutoJoinCtx().getFromString( + getFrom(), this, getDimsToQuery(), getCubeQueryContext())); + } + } + + CubeInterface getCube() { + return storageCandidate.getCubeQueryContext().getCube(); + } + + @Override + protected String getFromTable() throws LensException { + if (storageCandidate.getCubeQueryContext().isAutoJoinResolved()) { + return getFrom(); + } else { + return storageCandidate.getCubeQueryContext().getQBFromString(storageCandidate, getDimsToQuery()); + } + } + + @Override + public void updateDimFilterWithFactFilter() throws LensException { + if (!getStorageCandidate().getStorageName().isEmpty()) { + String qualifiedStorageTable = getStorageCandidate().getStorageName(); + String storageTable = qualifiedStorageTable.substring(qualifiedStorageTable.indexOf(".") + 1); + String where = getCubeQueryContext().getWhere(this, getCubeQueryContext().getAutoJoinCtx(), + getQueryAst().getWhereAST(), + getCubeQueryContext().getAliasForTableName(getStorageCandidate().getBaseTable().getName()), + getCubeQueryContext().shouldReplaceDimFilterWithFactFilter(), storageTable, getDimsToQuery()); + setWhere(where); + } + } + + private void updateAnswerableSelectColumns() throws LensException { + // update select AST with selected fields + int currentChild = 0; + for (int i = 0; i < getCubeQueryContext().getSelectAST().getChildCount(); i++) { + ASTNode selectExpr = (ASTNode) queryAst.getSelectAST().getChild(currentChild); + Set<String> exprCols = HQLParser.getColsInExpr(getCubeQueryContext().getAliasForTableName(getCube()), selectExpr); + if (getStorageCandidate().getColumns().containsAll(exprCols)) { + ASTNode aliasNode = HQLParser.findNodeByPath(selectExpr, HiveParser.Identifier); + String alias = getCubeQueryContext().getSelectPhrases().get(i).getSelectAlias(); + if (aliasNode != null) { + String queryAlias = aliasNode.getText(); + if (!queryAlias.equals(alias)) { + // replace the alias node + ASTNode newAliasNode = new ASTNode(new CommonToken(HiveParser.Identifier, alias)); + queryAst.getSelectAST().getChild(currentChild) + .replaceChildren(selectExpr.getChildCount() - 1, selectExpr.getChildCount() - 1, newAliasNode); + } + } else { + // add column alias + ASTNode newAliasNode = new ASTNode(new CommonToken(HiveParser.Identifier, alias)); + queryAst.getSelectAST().getChild(currentChild).addChild(newAliasNode); + } + } else { + queryAst.getSelectAST().deleteChild(currentChild); + currentChild--; + } + currentChild++; + } + } + + @Override + protected void setMissingExpressions() throws LensException { + setFrom(getFromTable()); + setWhere(genWhereClauseWithDimPartitions(getWhere())); + if (isRoot()) { + if (Objects.equals(getStorageCandidate(), getCubeQueryContext().getPickedCandidate())) { + updateAnswerableSelectColumns(); + // Check if the picked candidate is a StorageCandidate and in that case + // update the selectAST with final alias. + CandidateUtil.updateFinalAlias(queryAst.getSelectAST(), getCubeQueryContext()); + updateOrderByWithFinalAlias(queryAst.getOrderByAST(), queryAst.getSelectAST()); + setPrefix(getCubeQueryContext().getInsertClause()); + } + } + } + + private void updateOrderByWithFinalAlias(ASTNode orderby, ASTNode select) { + if (orderby == null) { + return; + } + for (Node orderbyNode : orderby.getChildren()) { + ASTNode orderBychild = (ASTNode) orderbyNode; + for (Node selectNode : select.getChildren()) { + ASTNode selectChild = (ASTNode) selectNode; + if (selectChild.getChildCount() == 2) { + if (HQLParser.getString((ASTNode) selectChild.getChild(0)) + .equals(HQLParser.getString((ASTNode) orderBychild.getChild(0)))) { + ASTNode alias = new ASTNode((ASTNode) selectChild.getChild(1)); + orderBychild.replaceChildren(0, 0, alias); + break; + } + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/lens/blob/b58749e2/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java index 957b9ff..291712b 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageTableResolver.java @@ -99,7 +99,7 @@ class StorageTableResolver implements ContextRewriter { if (cubeql.getAutoJoinCtx() != null) { // After all candidates are pruned after storage resolver, prune join paths. cubeql.getAutoJoinCtx() - .pruneAllPaths(cubeql.getCube(), CandidateUtil.getStorageCandidates(cubeql.getCandidates()), null); + .pruneAllPaths(cubeql.getCube(), CandidateUtil.getColumnsFromCandidates(cubeql.getCandidates()), null); cubeql.getAutoJoinCtx().pruneAllPathsForCandidateDims(cubeql.getCandidateDimTables()); cubeql.getAutoJoinCtx().refreshJoinPathColumns(); } @@ -137,7 +137,7 @@ class StorageTableResolver implements ContextRewriter { } else if (failOnPartialData && !isComplete) { candidateIterator.remove(); log.info("Not considering candidate:{} as its data is not is not complete", candidate); - Set<StorageCandidate> scSet = CandidateUtil.getStorageCandidates(candidate); + Collection<StorageCandidate> scSet = CandidateUtil.getStorageCandidates(candidate); for (StorageCandidate sc : scSet) { if (!sc.getNonExistingPartitions().isEmpty()) { cubeql.addStoragePruningMsg(sc, CandidateTablePruneCause.missingPartitions(sc.getNonExistingPartitions())); @@ -145,12 +145,13 @@ class StorageTableResolver implements ContextRewriter { cubeql.addStoragePruningMsg(sc, incompletePartitions(sc.getDataCompletenessMap())); } } - } else if (candidate.getParticipatingPartitions().isEmpty() - && candidate instanceof StorageCandidate - && ((StorageCandidate) candidate).getNonExistingPartitions().isEmpty()) { - candidateIterator.remove(); - cubeql.addCandidatePruningMsg(candidate, - new CandidateTablePruneCause(CandidateTablePruneCode.NO_FACT_UPDATE_PERIODS_FOR_GIVEN_RANGE)); + } else if (candidate.getParticipatingPartitions().isEmpty()) { + if (candidate instanceof StorageCandidate + && ((StorageCandidate) candidate).getNonExistingPartitions().isEmpty()) { + candidateIterator.remove(); + cubeql.addCandidatePruningMsg(candidate, + new CandidateTablePruneCause(CandidateTablePruneCode.NO_FACT_UPDATE_PERIODS_FOR_GIVEN_RANGE)); + } } } } @@ -248,114 +249,113 @@ class StorageTableResolver implements ContextRewriter { Iterator<Candidate> it = cubeql.getCandidates().iterator(); while (it.hasNext()) { Candidate c = it.next(); - assert (c instanceof StorageCandidate); - StorageCandidate sc = (StorageCandidate) c; - String storageTable = sc.getStorageName(); - // first check: if the storage is supported on driver - if (!isStorageSupportedOnDriver(storageTable)) { - log.info("Skipping storage: {} as it is not supported", storageTable); - cubeql.addStoragePruningMsg(sc, new CandidateTablePruneCause(CandidateTablePruneCode.UNSUPPORTED_STORAGE)); - it.remove(); - continue; - } - String str = conf.get(CubeQueryConfUtil.getValidStorageTablesKey(sc.getFact().getName())); - List<String> validFactStorageTables = - StringUtils.isBlank(str) ? null : Arrays.asList(StringUtils.split(str.toLowerCase(), ",")); - storageTable = sc.getStorageTable(); - // Check if storagetable is in the list of valid storages. - if (validFactStorageTables != null && !validFactStorageTables.contains(storageTable)) { - log.info("Skipping storage table {} as it is not valid", storageTable); - cubeql.addStoragePruningMsg(sc, new CandidateTablePruneCause(CandidateTablePruneCode.INVALID_STORAGE)); - it.remove(); - continue; - } - List<String> validUpdatePeriods = CubeQueryConfUtil - .getStringList(conf, CubeQueryConfUtil.getValidUpdatePeriodsKey(sc.getFact().getName(), sc.getStorageName())); - boolean isUpdatePeriodForStorageAdded = false; - Map<String, SkipUpdatePeriodCode> skipUpdatePeriodCauses = new HashMap<>(); - - if (cubeql.getTimeRanges().stream().noneMatch(range -> CandidateUtil.isPartiallyValidForTimeRange(sc, range))) { - cubeql.addStoragePruningMsg(sc, - new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE)); - it.remove(); - continue; - } - - // Populate valid update periods abd check validity at update period level - for (UpdatePeriod updatePeriod : sc.getFact().getUpdatePeriods().get(sc.getStorageName())) { - if (maxInterval != null && updatePeriod.compareTo(maxInterval) > 0) { - // if user supplied max interval, all intervals larger than that are useless. - log.info("Skipping update period {} for candidate {} since it's more than max interval supplied({})", - updatePeriod, sc.getStorageTable(), maxInterval); - skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.UPDATE_PERIOD_BIGGER_THAN_MAX); - } else if (validUpdatePeriods != null && !validUpdatePeriods.contains(updatePeriod.name().toLowerCase())) { - // if user supplied valid update periods, other update periods are useless - log.info("Skipping update period {} for candidate {} for storage {} since it's invalid", - updatePeriod, sc.getStorageTable(), storageTable); - skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.INVALID); - } else if (!sc.isUpdatePeriodUseful(updatePeriod)) { - // if the storage candidate finds this update useful to keep looking at the time ranges queried - skipUpdatePeriodCauses.put(updatePeriod.toString(), - SkipUpdatePeriodCode.TIME_RANGE_NOT_ANSWERABLE_BY_UPDATE_PERIOD); - } else { - isUpdatePeriodForStorageAdded = true; - sc.addValidUpdatePeriod(updatePeriod); + if (c instanceof StorageCandidate) { + StorageCandidate sc = (StorageCandidate) c; + // first check: if the storage is supported on driver + if (!isStorageSupportedOnDriver(sc.getStorageName())) { + log.info("Skipping storage: {} as it is not supported", sc.getStorageName()); + cubeql.addStoragePruningMsg(sc, new CandidateTablePruneCause(CandidateTablePruneCode.UNSUPPORTED_STORAGE)); + it.remove(); + continue; } - } - // For DEBUG purpose only to see why some update periods are skipped. - if (!skipUpdatePeriodCauses.isEmpty()) { - sc.setUpdatePeriodRejectionCause(skipUpdatePeriodCauses); - } - // if no update periods were added in previous section, we skip this storage candidate - if (!isUpdatePeriodForStorageAdded) { - if (skipUpdatePeriodCauses.values().stream().allMatch( - SkipUpdatePeriodCode.TIME_RANGE_NOT_ANSWERABLE_BY_UPDATE_PERIOD::equals)) { - // all update periods bigger than query range, it means time range not answerable. + String str = conf.get(CubeQueryConfUtil.getValidStorageTablesKey(sc.getFact().getName())); + List<String> validFactStorageTables = + StringUtils.isBlank(str) ? null : Arrays.asList(StringUtils.split(str.toLowerCase(), ",")); + // Check if storagetable is in the list of valid storages. + if (validFactStorageTables != null && !validFactStorageTables.contains(sc.getStorageTable())) { + log.info("Skipping storage table {} as it is not valid", sc.getStorageTable()); + cubeql.addStoragePruningMsg(sc, new CandidateTablePruneCause(CandidateTablePruneCode.INVALID_STORAGE)); + it.remove(); + continue; + } + List<String> validUpdatePeriods = CubeQueryConfUtil + .getStringList(conf, CubeQueryConfUtil.getValidUpdatePeriodsKey(sc.getFact().getName(), sc.getStorageName())); + boolean isUpdatePeriodForStorageAdded = false; + Map<String, SkipUpdatePeriodCode> skipUpdatePeriodCauses = new HashMap<>(); + + if (!sc.isPartiallyValidForTimeRanges(cubeql.getTimeRanges())) { cubeql.addStoragePruningMsg(sc, new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE)); - } else { // Update periods are rejected for multiple reasons. - cubeql.addStoragePruningMsg(sc, CandidateTablePruneCause.updatePeriodsRejected(skipUpdatePeriodCauses)); + it.remove(); + continue; + } + + // Populate valid update periods abd check validity at update period level + for (UpdatePeriod updatePeriod : sc.getFact().getUpdatePeriods().get(sc.getStorageName())) { + if (maxInterval != null && updatePeriod.compareTo(maxInterval) > 0) { + // if user supplied max interval, all intervals larger than that are useless. + log.info("Skipping update period {} for candidate {} since it's more than max interval supplied({})", + updatePeriod, sc.getStorageTable(), maxInterval); + skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.UPDATE_PERIOD_BIGGER_THAN_MAX); + } else if (validUpdatePeriods != null && !validUpdatePeriods.contains(updatePeriod.name().toLowerCase())) { + // if user supplied valid update periods, other update periods are useless + log.info("Skipping update period {} for candidate {} for storage {} since it's invalid", + updatePeriod, sc.getName(), sc.getStorageName()); + skipUpdatePeriodCauses.put(updatePeriod.toString(), SkipUpdatePeriodCode.INVALID); + } else if (!sc.isUpdatePeriodUseful(updatePeriod)) { + // if the storage candidate finds this update useful to keep looking at the time ranges queried + skipUpdatePeriodCauses.put(updatePeriod.toString(), + SkipUpdatePeriodCode.TIME_RANGE_NOT_ANSWERABLE_BY_UPDATE_PERIOD); + } else { + isUpdatePeriodForStorageAdded = true; + sc.addValidUpdatePeriod(updatePeriod); + } + } + // For DEBUG purpose only to see why some update periods are skipped. + if (!skipUpdatePeriodCauses.isEmpty()) { + sc.setUpdatePeriodRejectionCause(skipUpdatePeriodCauses); } - it.remove(); - } else { - //set the dates again as they can change based on ValidUpdatePeriod - sc.setStorageStartAndEndDate(); - Set<CandidateTablePruneCause> allPruningCauses = new HashSet<>(cubeql.getTimeRanges().size()); - for (TimeRange range : cubeql.getTimeRanges()) { - CandidateTablePruneCause pruningCauseForThisTimeRange = null; - if (!CandidateUtil.isPartiallyValidForTimeRange(sc, range)) { - //This is the prune cause - pruningCauseForThisTimeRange = - new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE); - } else if (!sc.getValidUpdatePeriods().contains(UpdatePeriod.CONTINUOUS)) { - if (!client.partColExists(sc.getFact().getName(), sc.getStorageName(), range.getPartitionColumn())) { - pruningCauseForThisTimeRange = partitionColumnsMissing(range.getPartitionColumn()); - TimeRange fallBackRange = StorageUtil.getFallbackRange(range, sc.getFact().getName(), cubeql); - while (fallBackRange != null) { - pruningCauseForThisTimeRange = null; - if (!client.partColExists(sc.getFact().getName(), sc.getStorageName(), - fallBackRange.getPartitionColumn())) { - pruningCauseForThisTimeRange = partitionColumnsMissing(fallBackRange.getPartitionColumn()); - fallBackRange = StorageUtil.getFallbackRange(fallBackRange, sc.getFact().getName(), cubeql); - } else { - if (!CandidateUtil.isPartiallyValidForTimeRange(sc, fallBackRange)) { - pruningCauseForThisTimeRange = - new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE); + // if no update periods were added in previous section, we skip this storage candidate + if (!isUpdatePeriodForStorageAdded) { + if (skipUpdatePeriodCauses.values().stream().allMatch( + SkipUpdatePeriodCode.TIME_RANGE_NOT_ANSWERABLE_BY_UPDATE_PERIOD::equals)) { + // all update periods bigger than query range, it means time range not answerable. + cubeql.addStoragePruningMsg(sc, + new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE)); + } else { // Update periods are rejected for multiple reasons. + cubeql.addStoragePruningMsg(sc, CandidateTablePruneCause.updatePeriodsRejected(skipUpdatePeriodCauses)); + } + it.remove(); + } else { + //set the dates again as they can change based on ValidUpdatePeriod + sc.setStorageStartAndEndDate(); + Set<CandidateTablePruneCause> allPruningCauses = new HashSet<>(cubeql.getTimeRanges().size()); + for (TimeRange range : cubeql.getTimeRanges()) { + CandidateTablePruneCause pruningCauseForThisTimeRange = null; + if (!sc.isPartiallyValidForTimeRange(range)) { + //This is the prune cause + pruningCauseForThisTimeRange = + new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE); + } else if (!sc.getValidUpdatePeriods().contains(UpdatePeriod.CONTINUOUS)) { + if (!client.partColExists(sc.getFact().getName(), sc.getStorageName(), range.getPartitionColumn())) { + pruningCauseForThisTimeRange = partitionColumnsMissing(range.getPartitionColumn()); + TimeRange fallBackRange = StorageUtil.getFallbackRange(range, sc.getFact().getName(), cubeql); + while (fallBackRange != null) { + pruningCauseForThisTimeRange = null; + if (!client.partColExists(sc.getFact().getName(), sc.getStorageName(), + fallBackRange.getPartitionColumn())) { + pruningCauseForThisTimeRange = partitionColumnsMissing(fallBackRange.getPartitionColumn()); + fallBackRange = StorageUtil.getFallbackRange(fallBackRange, sc.getFact().getName(), cubeql); + } else { + if (!sc.isPartiallyValidForTimeRange(fallBackRange)) { + pruningCauseForThisTimeRange = + new CandidateTablePruneCause(CandidateTablePruneCode.TIME_RANGE_NOT_ANSWERABLE); + } + break; } - break; } } } - } - if (pruningCauseForThisTimeRange != null) { - allPruningCauses.add(pruningCauseForThisTimeRange); + if (pruningCauseForThisTimeRange != null) { + allPruningCauses.add(pruningCauseForThisTimeRange); + } + } + if (!allPruningCauses.isEmpty()) { + // TODO if this storage can answer atleast one time range , why prune it ? + it.remove(); + cubeql.addStoragePruningMsg(sc, allPruningCauses.toArray(new CandidateTablePruneCause[0])); } - } - if (!allPruningCauses.isEmpty()) { - // TODO if this storage can answer atleast one time range , why prune it ? - it.remove(); - cubeql.addStoragePruningMsg(sc, allPruningCauses.toArray(new CandidateTablePruneCause[0])); } } } http://git-wip-us.apache.org/repos/asf/lens/blob/b58749e2/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java index f5cd540..8713547 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/StorageUtil.java @@ -83,18 +83,6 @@ public final class StorageUtil { return sb.toString(); } - public static String joinWithAnd(String... clauses) { - StringBuilder sb = new StringBuilder(); - String sep = "(("; - for (String clause : clauses) { - if (clause != null && !clause.isEmpty()) { - sb.append(sep).append(clause); - sep = ") AND ("; - } - } - return sb.append(sep.equals("((") ? "" : "))").toString(); - } - /** * Get minimal set of storages which cover the queried partitions * @@ -108,12 +96,7 @@ public final class StorageUtil { // invert the answering tables map and put in inverted map for (FactPartition part : answeringParts) { for (String table : part.getStorageTables()) { - Set<FactPartition> partsCovered = invertedMap.get(table); - if (partsCovered == null) { - partsCovered = new TreeSet<FactPartition>(); - invertedMap.put(table, partsCovered); - } - partsCovered.add(part); + invertedMap.computeIfAbsent(table, k -> new TreeSet<>()).add(part); } } // there exist only one storage @@ -200,7 +183,7 @@ public final class StorageUtil { DateUtil.TimeDiff diff2 = DateUtil.TimeDiff.parseFrom(matcher.group(3).trim()); String relatedTimeDim = matcher.group(1).trim(); String fallbackPartCol = baseCube.getPartitionColumnOfTimeDim(relatedTimeDim); - return TimeRange.getBuilder().fromDate(diff2.negativeOffsetFrom(range.getFromDate())) + return TimeRange.builder().fromDate(diff2.negativeOffsetFrom(range.getFromDate())) .toDate(diff1.negativeOffsetFrom(range.getToDate())).partitionColumn(fallbackPartCol).build(); } http://git-wip-us.apache.org/repos/asf/lens/blob/b58749e2/lens-cube/src/main/java/org/apache/lens/cube/parse/TimerangeResolver.java ---------------------------------------------------------------------- diff --git a/lens-cube/src/main/java/org/apache/lens/cube/parse/TimerangeResolver.java b/lens-cube/src/main/java/org/apache/lens/cube/parse/TimerangeResolver.java index 760ce82..8e77a82 100644 --- a/lens-cube/src/main/java/org/apache/lens/cube/parse/TimerangeResolver.java +++ b/lens-cube/src/main/java/org/apache/lens/cube/parse/TimerangeResolver.java @@ -91,7 +91,7 @@ class TimerangeResolver implements ContextRewriter { private void processTimeRangeFunction(CubeQueryContext cubeql, ASTNode timenode, ASTNode parent, int childIndex) throws LensException { - TimeRange.TimeRangeBuilder builder = TimeRange.getBuilder(); + TimeRange.TimeRangeBuilder builder = TimeRange.builder(); builder.astNode(timenode); builder.parent(parent); builder.childIndex(childIndex);