This is an automated email from the ASF dual-hosted git repository. kunalkapoor pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/carbondata.git
The following commit(s) were added to refs/heads/master by this push: new f248a4d Why is this PR needed? When add segment is done on main table which has SI also on it. Then filter query is fired on SI column, only segments which are loaded can be considered for SI pruning and external segment should be queried from main table. f248a4d is described below commit f248a4da3a231a4173e08eced5c90de8c636946a Author: akashrn5 <akashnilu...@gmail.com> AuthorDate: Wed Mar 4 15:17:02 2020 +0530 Why is this PR needed? When add segment is done on main table which has SI also on it. Then filter query is fired on SI column, only segments which are loaded can be considered for SI pruning and external segment should be queried from main table. What changes were proposed in this PR? Handle while pruning for external segment. if external segment, get the filter from extersegment filter tree. This closes #3656 --- .../carbondata/core/datamap/DataMapFilter.java | 37 +++++++ .../carbondata/core/datamap/TableDataMap.java | 110 +++++++++++++++------ .../scan/executor/impl/AbstractQueryExecutor.java | 8 +- .../secondaryindex/TestSIWithAddSegment.scala | 7 +- .../command/management/CarbonAddLoadCommand.scala | 4 + .../events/SILoadEventListener.scala | 4 + .../load/CarbonInternalLoaderUtil.java | 10 +- 7 files changed, 140 insertions(+), 40 deletions(-) diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java index 4d47565..f85fd53 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.Set; +import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.block.SegmentProperties; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; @@ -31,6 +32,8 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.scan.executor.util.RestructureUtil; import org.apache.carbondata.core.scan.expression.ColumnExpression; import org.apache.carbondata.core.scan.expression.Expression; +import org.apache.carbondata.core.scan.expression.conditional.InExpression; +import org.apache.carbondata.core.scan.expression.logical.AndExpression; import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor; import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer; import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer; @@ -49,6 +52,10 @@ public class DataMapFilter implements Serializable { private Expression expression; + private Expression externalSegmentFilter; + + private FilterResolverIntf externalSegmentResolver; + private FilterResolverIntf resolver; private String serializedExpression; @@ -65,6 +72,7 @@ public class DataMapFilter implements Serializable { resolve(lazyResolve); if (expression != null) { checkIfFilterColumnExistsInTable(); + initializeExternalSegmentFilter(); try { this.serializedExpression = ObjectSerializationUtil.convertObjectToString(expression); } catch (IOException e) { @@ -100,6 +108,20 @@ public class DataMapFilter implements Serializable { this.table = table; } + private void initializeExternalSegmentFilter() { + if ((expression instanceof AndExpression) && (((AndExpression) expression) + .getRight() instanceof InExpression) && (expression.getChildren().get(1).getChildren() + .get(0) instanceof ColumnExpression) && (((ColumnExpression) expression.getChildren().get(1) + .getChildren().get(0))).getColumnName() + .equalsIgnoreCase(CarbonCommonConstants.POSITION_ID)) { + externalSegmentFilter = ((AndExpression) expression).getLeft(); + if (externalSegmentFilter != null) { + processFilterExpression(null, null); + externalSegmentResolver = resolver != null ? resolver.getLeft() : resolveFilter().getLeft(); + } + } + } + private Set<String> extractColumnExpressions(Expression expression) { Set<String> columnExpressionList = new HashSet<>(); for (Expression expressions: expression.getChildren()) { @@ -161,6 +183,7 @@ public class DataMapFilter implements Serializable { public void setExpression(Expression expression) { this.expression = expression; + initializeExternalSegmentFilter(); } public FilterResolverIntf getResolver() { @@ -185,6 +208,20 @@ public class DataMapFilter implements Serializable { .hasColumnDriftOnSegment(table, segmentProperties)); } + Expression getExternalSegmentFilter() { + if (externalSegmentFilter == null) { + return expression; + } + return externalSegmentFilter; + } + + public FilterResolverIntf getExternalSegmentResolver() { + if (externalSegmentResolver == null) { + return resolver; + } + return externalSegmentResolver; + } + public void processFilterExpression() { processFilterExpression(null, null); } diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java index 036118f..62424fe 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java @@ -193,30 +193,54 @@ public final class TableDataMap extends OperationEventListener { if (dataMaps.get(segment).isEmpty() || dataMaps.get(segment) == null) { continue; } + boolean isExternalSegment = segment.getSegmentPath() != null; List<Blocklet> pruneBlocklets = new ArrayList<>(); SegmentProperties segmentProperties = segmentPropertiesFetcher.getSegmentProperties(segment, partitions); if (filter.isResolvedOnSegment(segmentProperties)) { - FilterExecuter filterExecuter = FilterUtil - .getFilterExecuterTree(filter.getResolver(), segmentProperties, - null, table.getMinMaxCacheColumns(segmentProperties), - false); + FilterExecuter filterExecuter; + if (!isExternalSegment) { + filterExecuter = FilterUtil + .getFilterExecuterTree(filter.getResolver(), segmentProperties, null, + table.getMinMaxCacheColumns(segmentProperties), false); + } else { + filterExecuter = FilterUtil + .getFilterExecuterTree(filter.getExternalSegmentResolver(), segmentProperties, null, + table.getMinMaxCacheColumns(segmentProperties), false); + } for (DataMap dataMap : dataMaps.get(segment)) { - pruneBlocklets.addAll( - dataMap.prune(filter.getResolver(), segmentProperties, partitions, filterExecuter, - this.table)); + if (!isExternalSegment) { + pruneBlocklets.addAll(dataMap + .prune(filter.getResolver(), segmentProperties, partitions, filterExecuter, + this.table)); + } else { + pruneBlocklets.addAll(dataMap + .prune(filter.getExternalSegmentResolver(), segmentProperties, partitions, + filterExecuter, this.table)); + } } } else { + FilterExecuter filterExecuter; Expression expression = filter.getExpression(); - FilterExecuter filterExecuter = FilterUtil - .getFilterExecuterTree(new DataMapFilter(segmentProperties, table, - expression).getResolver(), segmentProperties, - null, table.getMinMaxCacheColumns(segmentProperties), - false); + if (!isExternalSegment) { + filterExecuter = FilterUtil.getFilterExecuterTree( + new DataMapFilter(segmentProperties, table, expression).getResolver(), + segmentProperties, null, table.getMinMaxCacheColumns(segmentProperties), false); + } else { + filterExecuter = FilterUtil.getFilterExecuterTree( + new DataMapFilter(segmentProperties, table, expression).getExternalSegmentResolver(), + segmentProperties, null, table.getMinMaxCacheColumns(segmentProperties), false); + } for (DataMap dataMap : dataMaps.get(segment)) { - pruneBlocklets.addAll( - dataMap.prune(filter.getExpression(), segmentProperties, - partitions, table, filterExecuter)); + if (!isExternalSegment) { + pruneBlocklets.addAll(dataMap + .prune(filter.getExpression(), segmentProperties, partitions, table, + filterExecuter)); + } else { + pruneBlocklets.addAll(dataMap + .prune(filter.getExternalSegmentFilter(), segmentProperties, partitions, table, + filterExecuter)); + } } } blocklets.addAll( @@ -325,31 +349,59 @@ public final class TableDataMap extends OperationEventListener { SegmentProperties segmentProperties = segmentPropertiesFetcher.getSegmentPropertiesFromDataMap(dataMapList.get(0)); Segment segment = segmentDataMapGroup.getSegment(); + boolean isExternalSegment = segment.getSegmentPath() != null; if (filter.isResolvedOnSegment(segmentProperties)) { - FilterExecuter filterExecuter = FilterUtil - .getFilterExecuterTree(filter.getResolver(), segmentProperties, - null, table.getMinMaxCacheColumns(segmentProperties), - false); + FilterExecuter filterExecuter; + if (!isExternalSegment) { + filterExecuter = FilterUtil + .getFilterExecuterTree(filter.getResolver(), segmentProperties, null, + table.getMinMaxCacheColumns(segmentProperties), false); + } else { + filterExecuter = FilterUtil + .getFilterExecuterTree(filter.getExternalSegmentResolver(), segmentProperties, + null, table.getMinMaxCacheColumns(segmentProperties), false); + } for (int i = segmentDataMapGroup.getFromIndex(); i <= segmentDataMapGroup.getToIndex(); i++) { - List<Blocklet> dmPruneBlocklets = dataMapList.get(i).prune( - filter.getResolver(), segmentProperties, partitions, filterExecuter, table); + List<Blocklet> dmPruneBlocklets; + if (!isExternalSegment) { + dmPruneBlocklets = dataMapList.get(i) + .prune(filter.getResolver(), segmentProperties, partitions, filterExecuter, + table); + } else { + dmPruneBlocklets = dataMapList.get(i) + .prune(filter.getExternalSegmentResolver(), segmentProperties, partitions, + filterExecuter, table); + } pruneBlocklets.addAll(addSegmentId( blockletDetailsFetcher.getExtendedBlocklets(dmPruneBlocklets, segment), segment)); } } else { Expression filterExpression = filter.getNewCopyOfExpression(); - FilterExecuter filterExecuter = FilterUtil - .getFilterExecuterTree(new DataMapFilter(segmentProperties, table, - filterExpression).getResolver(), segmentProperties, - null, table.getMinMaxCacheColumns(segmentProperties), - false); + FilterExecuter filterExecuter; + if (!isExternalSegment) { + filterExecuter = FilterUtil.getFilterExecuterTree( + new DataMapFilter(segmentProperties, table, filterExpression).getResolver(), + segmentProperties, null, table.getMinMaxCacheColumns(segmentProperties), false); + } else { + filterExecuter = FilterUtil.getFilterExecuterTree( + new DataMapFilter(segmentProperties, table, filterExpression) + .getExternalSegmentResolver(), segmentProperties, null, + table.getMinMaxCacheColumns(segmentProperties), false); + } for (int i = segmentDataMapGroup.getFromIndex(); i <= segmentDataMapGroup.getToIndex(); i++) { - List<Blocklet> dmPruneBlocklets = dataMapList.get(i).prune( - filterExpression, segmentProperties, partitions, table, - filterExecuter); + List<Blocklet> dmPruneBlocklets; + if (!isExternalSegment) { + dmPruneBlocklets = dataMapList.get(i) + .prune(filterExpression, segmentProperties, partitions, table, + filterExecuter); + } else { + dmPruneBlocklets = dataMapList.get(i) + .prune(filter.getExternalSegmentFilter(), segmentProperties, partitions, + table, filterExecuter); + } pruneBlocklets.addAll(addSegmentId( blockletDetailsFetcher.getExtendedBlocklets(dmPruneBlocklets, segment), segment)); diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java index 0c49199..dbf405a 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java @@ -487,8 +487,12 @@ public abstract class AbstractQueryExecutor<E> implements QueryExecutor<E> { queryProperties.complexFilterDimension)); if (null != queryModel.getDataMapFilter()) { FilterResolverIntf filterResolverIntf; - // loading the filter executor tree for filter evaluation - filterResolverIntf = queryModel.getDataMapFilter().getResolver(); + if (!filePath.startsWith(queryModel.getTable().getTablePath())) { + filterResolverIntf = queryModel.getDataMapFilter().getExternalSegmentResolver(); + } else { + // loading the filter executor tree for filter evaluation + filterResolverIntf = queryModel.getDataMapFilter().getResolver(); + } blockExecutionInfo.setFilterExecuterTree( FilterUtil.getFilterExecuterTree(filterResolverIntf, segmentProperties, blockExecutionInfo.getComlexDimensionInfoMap(), false)); diff --git a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithAddSegment.scala b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithAddSegment.scala index 31f7826..59fbbc2 100644 --- a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithAddSegment.scala +++ b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithAddSegment.scala @@ -26,7 +26,6 @@ import org.apache.carbondata.core.metadata.datatype.Field import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.sdk.file.{CarbonSchemaReader, CarbonWriterBuilder, Schema} -@Ignore class TestSIWithAddSegment extends QueryTest with BeforeAndAfterAll { val newSegmentPath: String = warehouse + "/newsegment/" @@ -64,14 +63,14 @@ class TestSIWithAddSegment extends QueryTest with BeforeAndAfterAll { assert(d.queryExecution.executedPlan.isInstanceOf[BroadCastSIFilterPushJoin]) } - ignore("compare results of SI and NI after adding segments") { + test("compare results of SI and NI after adding segments") { val siResult = sql("select * from maintable where c = 'm'") val niResult = sql("select * from maintable where ni(c = 'm')") assert(!niResult.queryExecution.executedPlan.isInstanceOf[BroadCastSIFilterPushJoin]) checkAnswer(siResult, niResult) } - ignore("test SI creation after adding segments") { + test("test SI creation after adding segments") { sql("create table maintable1(a string, b int, c string) stored as carbondata") sql("insert into maintable1 select 'k',1,'k'") sql("insert into maintable1 select 'l',2,'l'") @@ -93,7 +92,7 @@ class TestSIWithAddSegment extends QueryTest with BeforeAndAfterAll { checkAnswer(siResult, niResult) } - ignore("test query on SI with all external segments") { + test("test query on SI with all external segments") { sql("drop table if exists maintable1") sql("create table maintable1(a string, b int, c string) stored as carbondata") sql("CREATE INDEX maintable1_si on table maintable1 (c) as 'carbondata'") diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala index 7cc87c2..2ff71bc 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala @@ -278,6 +278,10 @@ case class CarbonAddLoadCommand( operationContext.setProperty( carbonTable.getTableUniqueName + "_Segment", model.getSegmentId) + // when this event is triggered, SI load listener will be called for all the SI tables under + // this main table, no need to load the SI tables for add load command, so add this property + // to check in SI loadevent listener to avoid loading to SI. + operationContext.setProperty("isAddLoad", "true") val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent = new LoadTablePreStatusUpdateEvent( carbonTable.getCarbonTableIdentifier, diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala index 65e295d..d78b923 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala +++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala @@ -42,6 +42,10 @@ class SILoadEventListener extends OperationEventListener with Logging { override def onEvent(event: Event, operationContext: OperationContext): Unit = { event match { case _: LoadTablePreStatusUpdateEvent => + if (operationContext.getProperty("isAddLoad") != null && + operationContext.getProperty("isAddLoad").toString.toBoolean) { + return + } LOGGER.info("Load pre status update event-listener called") val loadTablePreStatusUpdateEvent = event.asInstanceOf[LoadTablePreStatusUpdateEvent] val carbonLoadModel = loadTablePreStatusUpdateEvent.getCarbonLoadModel diff --git a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java index 6b8fee2..c6747ec 100644 --- a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java +++ b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java @@ -308,17 +308,17 @@ public class CarbonInternalLoaderUtil { */ public static boolean checkMainTableSegEqualToSISeg(String carbonTablePath, String indexTablePath) { - List<String> mainList = - getListOfValidSlices(SegmentStatusManager.readLoadMetadata(carbonTablePath)); + List<String> mainTableSegmentsList = + getListOfValidSlices(SegmentStatusManager.readCarbonMetaData(carbonTablePath)); List<String> indexList = getListOfValidSlices(SegmentStatusManager.readLoadMetadata(indexTablePath)); - Collections.sort(mainList); + Collections.sort(mainTableSegmentsList); Collections.sort(indexList); - if (indexList.size() != mainList.size()) { + if (indexList.size() != mainTableSegmentsList.size()) { return false; } for (int i = 0; i < indexList.size(); i++) { - if (!indexList.get(i).equalsIgnoreCase(mainList.get(i))) { + if (!indexList.get(i).equalsIgnoreCase(mainTableSegmentsList.get(i))) { return false; } }