This is an automated email from the ASF dual-hosted git repository.

kunalkapoor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new f248a4d  Why is this PR needed? When add segment is done on main table 
which has SI also on it. Then filter query is fired on SI column, only segments 
which are loaded can be considered for SI pruning and external segment should 
be queried from main table.
f248a4d is described below

commit f248a4da3a231a4173e08eced5c90de8c636946a
Author: akashrn5 <akashnilu...@gmail.com>
AuthorDate: Wed Mar 4 15:17:02 2020 +0530

    Why is this PR needed?
    When add segment is done on main table which has SI also on it. Then filter 
query is fired on SI column,
    only segments which are loaded can be considered for SI pruning and 
external segment should be queried from main table.
    
    What changes were proposed in this PR?
    Handle while pruning for external segment. if external segment, get the 
filter from extersegment filter tree.
    
    This closes #3656
---
 .../carbondata/core/datamap/DataMapFilter.java     |  37 +++++++
 .../carbondata/core/datamap/TableDataMap.java      | 110 +++++++++++++++------
 .../scan/executor/impl/AbstractQueryExecutor.java  |   8 +-
 .../secondaryindex/TestSIWithAddSegment.scala      |   7 +-
 .../command/management/CarbonAddLoadCommand.scala  |   4 +
 .../events/SILoadEventListener.scala               |   4 +
 .../load/CarbonInternalLoaderUtil.java             |  10 +-
 7 files changed, 140 insertions(+), 40 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
index 4d47565..f85fd53 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/DataMapFilter.java
@@ -23,6 +23,7 @@ import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.Set;
 
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.datastore.block.SegmentProperties;
 import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier;
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
@@ -31,6 +32,8 @@ import 
org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure;
 import org.apache.carbondata.core.scan.executor.util.RestructureUtil;
 import org.apache.carbondata.core.scan.expression.ColumnExpression;
 import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.conditional.InExpression;
+import org.apache.carbondata.core.scan.expression.logical.AndExpression;
 import org.apache.carbondata.core.scan.filter.FilterExpressionProcessor;
 import org.apache.carbondata.core.scan.filter.intf.FilterOptimizer;
 import org.apache.carbondata.core.scan.filter.optimizer.RangeFilterOptmizer;
@@ -49,6 +52,10 @@ public class DataMapFilter implements Serializable {
 
   private Expression expression;
 
+  private Expression externalSegmentFilter;
+
+  private FilterResolverIntf externalSegmentResolver;
+
   private FilterResolverIntf resolver;
 
   private String serializedExpression;
@@ -65,6 +72,7 @@ public class DataMapFilter implements Serializable {
     resolve(lazyResolve);
     if (expression != null) {
       checkIfFilterColumnExistsInTable();
+      initializeExternalSegmentFilter();
       try {
         this.serializedExpression = 
ObjectSerializationUtil.convertObjectToString(expression);
       } catch (IOException e) {
@@ -100,6 +108,20 @@ public class DataMapFilter implements Serializable {
     this.table = table;
   }
 
+  private void initializeExternalSegmentFilter() {
+    if ((expression instanceof AndExpression) && (((AndExpression) expression)
+        .getRight() instanceof InExpression) && 
(expression.getChildren().get(1).getChildren()
+        .get(0) instanceof ColumnExpression) && (((ColumnExpression) 
expression.getChildren().get(1)
+        .getChildren().get(0))).getColumnName()
+        .equalsIgnoreCase(CarbonCommonConstants.POSITION_ID)) {
+      externalSegmentFilter = ((AndExpression) expression).getLeft();
+      if (externalSegmentFilter != null) {
+        processFilterExpression(null, null);
+        externalSegmentResolver = resolver != null ? resolver.getLeft() : 
resolveFilter().getLeft();
+      }
+    }
+  }
+
   private Set<String> extractColumnExpressions(Expression expression) {
     Set<String> columnExpressionList = new HashSet<>();
     for (Expression expressions: expression.getChildren()) {
@@ -161,6 +183,7 @@ public class DataMapFilter implements Serializable {
 
   public void setExpression(Expression expression) {
     this.expression = expression;
+    initializeExternalSegmentFilter();
   }
 
   public FilterResolverIntf getResolver() {
@@ -185,6 +208,20 @@ public class DataMapFilter implements Serializable {
         .hasColumnDriftOnSegment(table, segmentProperties));
   }
 
+  Expression getExternalSegmentFilter() {
+    if (externalSegmentFilter == null) {
+      return expression;
+    }
+    return externalSegmentFilter;
+  }
+
+  public FilterResolverIntf getExternalSegmentResolver() {
+    if (externalSegmentResolver == null) {
+      return resolver;
+    }
+    return externalSegmentResolver;
+  }
+
   public void processFilterExpression() {
     processFilterExpression(null, null);
   }
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java 
b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
index 036118f..62424fe 100644
--- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java
@@ -193,30 +193,54 @@ public final class TableDataMap extends 
OperationEventListener {
       if (dataMaps.get(segment).isEmpty() || dataMaps.get(segment) == null) {
         continue;
       }
+      boolean isExternalSegment = segment.getSegmentPath() != null;
       List<Blocklet> pruneBlocklets = new ArrayList<>();
       SegmentProperties segmentProperties =
           segmentPropertiesFetcher.getSegmentProperties(segment, partitions);
       if (filter.isResolvedOnSegment(segmentProperties)) {
-        FilterExecuter filterExecuter = FilterUtil
-            .getFilterExecuterTree(filter.getResolver(), segmentProperties,
-                null, table.getMinMaxCacheColumns(segmentProperties),
-                false);
+        FilterExecuter filterExecuter;
+        if (!isExternalSegment) {
+          filterExecuter = FilterUtil
+              .getFilterExecuterTree(filter.getResolver(), segmentProperties, 
null,
+                  table.getMinMaxCacheColumns(segmentProperties), false);
+        } else {
+          filterExecuter = FilterUtil
+              .getFilterExecuterTree(filter.getExternalSegmentResolver(), 
segmentProperties, null,
+                  table.getMinMaxCacheColumns(segmentProperties), false);
+        }
         for (DataMap dataMap : dataMaps.get(segment)) {
-          pruneBlocklets.addAll(
-              dataMap.prune(filter.getResolver(), segmentProperties, 
partitions, filterExecuter,
-                  this.table));
+          if (!isExternalSegment) {
+            pruneBlocklets.addAll(dataMap
+                .prune(filter.getResolver(), segmentProperties, partitions, 
filterExecuter,
+                    this.table));
+          } else {
+            pruneBlocklets.addAll(dataMap
+                .prune(filter.getExternalSegmentResolver(), segmentProperties, 
partitions,
+                    filterExecuter, this.table));
+          }
         }
       } else {
+        FilterExecuter filterExecuter;
         Expression expression = filter.getExpression();
-        FilterExecuter filterExecuter = FilterUtil
-            .getFilterExecuterTree(new DataMapFilter(segmentProperties, table,
-                            expression).getResolver(), segmentProperties,
-                null, table.getMinMaxCacheColumns(segmentProperties),
-                false);
+        if (!isExternalSegment) {
+          filterExecuter = FilterUtil.getFilterExecuterTree(
+              new DataMapFilter(segmentProperties, table, 
expression).getResolver(),
+              segmentProperties, null, 
table.getMinMaxCacheColumns(segmentProperties), false);
+        } else {
+          filterExecuter = FilterUtil.getFilterExecuterTree(
+              new DataMapFilter(segmentProperties, table, 
expression).getExternalSegmentResolver(),
+              segmentProperties, null, 
table.getMinMaxCacheColumns(segmentProperties), false);
+        }
         for (DataMap dataMap : dataMaps.get(segment)) {
-          pruneBlocklets.addAll(
-              dataMap.prune(filter.getExpression(), segmentProperties,
-                  partitions, table, filterExecuter));
+          if (!isExternalSegment) {
+            pruneBlocklets.addAll(dataMap
+                .prune(filter.getExpression(), segmentProperties, partitions, 
table,
+                    filterExecuter));
+          } else {
+            pruneBlocklets.addAll(dataMap
+                .prune(filter.getExternalSegmentFilter(), segmentProperties, 
partitions, table,
+                    filterExecuter));
+          }
         }
       }
       blocklets.addAll(
@@ -325,31 +349,59 @@ public final class TableDataMap extends 
OperationEventListener {
             SegmentProperties segmentProperties =
                 
segmentPropertiesFetcher.getSegmentPropertiesFromDataMap(dataMapList.get(0));
             Segment segment = segmentDataMapGroup.getSegment();
+            boolean isExternalSegment = segment.getSegmentPath() != null;
             if (filter.isResolvedOnSegment(segmentProperties)) {
-              FilterExecuter filterExecuter = FilterUtil
-                  .getFilterExecuterTree(filter.getResolver(), 
segmentProperties,
-                      null, table.getMinMaxCacheColumns(segmentProperties),
-                      false);
+              FilterExecuter filterExecuter;
+              if (!isExternalSegment) {
+                filterExecuter = FilterUtil
+                    .getFilterExecuterTree(filter.getResolver(), 
segmentProperties, null,
+                        table.getMinMaxCacheColumns(segmentProperties), false);
+              } else {
+                filterExecuter = FilterUtil
+                    
.getFilterExecuterTree(filter.getExternalSegmentResolver(), segmentProperties,
+                        null, table.getMinMaxCacheColumns(segmentProperties), 
false);
+              }
               for (int i = segmentDataMapGroup.getFromIndex();
                    i <= segmentDataMapGroup.getToIndex(); i++) {
-                List<Blocklet> dmPruneBlocklets = dataMapList.get(i).prune(
-                    filter.getResolver(), segmentProperties, partitions, 
filterExecuter, table);
+                List<Blocklet> dmPruneBlocklets;
+                if (!isExternalSegment) {
+                  dmPruneBlocklets = dataMapList.get(i)
+                      .prune(filter.getResolver(), segmentProperties, 
partitions, filterExecuter,
+                          table);
+                } else {
+                  dmPruneBlocklets = dataMapList.get(i)
+                      .prune(filter.getExternalSegmentResolver(), 
segmentProperties, partitions,
+                          filterExecuter, table);
+                }
                 pruneBlocklets.addAll(addSegmentId(
                     
blockletDetailsFetcher.getExtendedBlocklets(dmPruneBlocklets, segment),
                     segment));
               }
             } else {
               Expression filterExpression = filter.getNewCopyOfExpression();
-              FilterExecuter filterExecuter = FilterUtil
-                  .getFilterExecuterTree(new DataMapFilter(segmentProperties, 
table,
-                                  filterExpression).getResolver(), 
segmentProperties,
-                      null, table.getMinMaxCacheColumns(segmentProperties),
-                      false);
+              FilterExecuter filterExecuter;
+              if (!isExternalSegment) {
+                filterExecuter = FilterUtil.getFilterExecuterTree(
+                    new DataMapFilter(segmentProperties, table, 
filterExpression).getResolver(),
+                    segmentProperties, null, 
table.getMinMaxCacheColumns(segmentProperties), false);
+              } else {
+                filterExecuter = FilterUtil.getFilterExecuterTree(
+                    new DataMapFilter(segmentProperties, table, 
filterExpression)
+                        .getExternalSegmentResolver(), segmentProperties, null,
+                    table.getMinMaxCacheColumns(segmentProperties), false);
+              }
               for (int i = segmentDataMapGroup.getFromIndex();
                    i <= segmentDataMapGroup.getToIndex(); i++) {
-                List<Blocklet> dmPruneBlocklets = dataMapList.get(i).prune(
-                        filterExpression, segmentProperties, partitions, table,
-                    filterExecuter);
+                List<Blocklet> dmPruneBlocklets;
+                if (!isExternalSegment) {
+                  dmPruneBlocklets = dataMapList.get(i)
+                      .prune(filterExpression, segmentProperties, partitions, 
table,
+                          filterExecuter);
+                } else {
+                  dmPruneBlocklets = dataMapList.get(i)
+                      .prune(filter.getExternalSegmentFilter(), 
segmentProperties, partitions,
+                          table, filterExecuter);
+                }
                 pruneBlocklets.addAll(addSegmentId(
                     
blockletDetailsFetcher.getExtendedBlocklets(dmPruneBlocklets, segment),
                     segment));
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index 0c49199..dbf405a 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -487,8 +487,12 @@ public abstract class AbstractQueryExecutor<E> implements 
QueryExecutor<E> {
             queryProperties.complexFilterDimension));
     if (null != queryModel.getDataMapFilter()) {
       FilterResolverIntf filterResolverIntf;
-      // loading the filter executor tree for filter evaluation
-      filterResolverIntf = queryModel.getDataMapFilter().getResolver();
+      if (!filePath.startsWith(queryModel.getTable().getTablePath())) {
+        filterResolverIntf = 
queryModel.getDataMapFilter().getExternalSegmentResolver();
+      } else {
+        // loading the filter executor tree for filter evaluation
+        filterResolverIntf = queryModel.getDataMapFilter().getResolver();
+      }
       blockExecutionInfo.setFilterExecuterTree(
           FilterUtil.getFilterExecuterTree(filterResolverIntf, 
segmentProperties,
               blockExecutionInfo.getComlexDimensionInfoMap(), false));
diff --git 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithAddSegment.scala
 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithAddSegment.scala
index 31f7826..59fbbc2 100644
--- 
a/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithAddSegment.scala
+++ 
b/index/secondary-index/src/test/scala/org/apache/carbondata/spark/testsuite/secondaryindex/TestSIWithAddSegment.scala
@@ -26,7 +26,6 @@ import org.apache.carbondata.core.metadata.datatype.Field
 import org.apache.carbondata.core.util.path.CarbonTablePath
 import org.apache.carbondata.sdk.file.{CarbonSchemaReader, 
CarbonWriterBuilder, Schema}
 
-@Ignore
 class TestSIWithAddSegment extends QueryTest with BeforeAndAfterAll {
 
   val newSegmentPath: String = warehouse + "/newsegment/"
@@ -64,14 +63,14 @@ class TestSIWithAddSegment extends QueryTest with 
BeforeAndAfterAll {
     
assert(d.queryExecution.executedPlan.isInstanceOf[BroadCastSIFilterPushJoin])
   }
 
-  ignore("compare results of SI and NI after adding segments") {
+  test("compare results of SI and NI after adding segments") {
     val siResult = sql("select * from maintable where c = 'm'")
     val niResult = sql("select * from maintable where ni(c = 'm')")
     
assert(!niResult.queryExecution.executedPlan.isInstanceOf[BroadCastSIFilterPushJoin])
     checkAnswer(siResult, niResult)
   }
 
-  ignore("test SI creation after adding segments") {
+  test("test SI creation after adding segments") {
     sql("create table maintable1(a string, b int, c string) stored as 
carbondata")
     sql("insert into maintable1 select 'k',1,'k'")
     sql("insert into maintable1 select 'l',2,'l'")
@@ -93,7 +92,7 @@ class TestSIWithAddSegment extends QueryTest with 
BeforeAndAfterAll {
     checkAnswer(siResult, niResult)
   }
 
-  ignore("test query on SI with all external segments") {
+  test("test query on SI with all external segments") {
     sql("drop table if exists maintable1")
     sql("create table maintable1(a string, b int, c string) stored as 
carbondata")
     sql("CREATE INDEX maintable1_si  on table maintable1 (c) as 'carbondata'")
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
index 7cc87c2..2ff71bc 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
@@ -278,6 +278,10 @@ case class CarbonAddLoadCommand(
       operationContext.setProperty(
         carbonTable.getTableUniqueName + "_Segment",
         model.getSegmentId)
+      // when this event is triggered, SI load listener will be called for all 
the SI tables under
+      // this main table, no need to load the SI tables for add load command, 
so add this property
+      // to check in SI loadevent listener to avoid loading to SI.
+      operationContext.setProperty("isAddLoad", "true")
       val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
         new LoadTablePreStatusUpdateEvent(
           carbonTable.getCarbonTableIdentifier,
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala
index 65e295d..d78b923 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListener.scala
@@ -42,6 +42,10 @@ class SILoadEventListener extends OperationEventListener 
with Logging {
   override def onEvent(event: Event, operationContext: OperationContext): Unit 
= {
     event match {
       case _: LoadTablePreStatusUpdateEvent =>
+        if (operationContext.getProperty("isAddLoad") != null &&
+            operationContext.getProperty("isAddLoad").toString.toBoolean) {
+          return
+        }
         LOGGER.info("Load pre status update event-listener called")
         val loadTablePreStatusUpdateEvent = 
event.asInstanceOf[LoadTablePreStatusUpdateEvent]
         val carbonLoadModel = loadTablePreStatusUpdateEvent.getCarbonLoadModel
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
index 6b8fee2..c6747ec 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/load/CarbonInternalLoaderUtil.java
@@ -308,17 +308,17 @@ public class CarbonInternalLoaderUtil {
    */
   public static boolean checkMainTableSegEqualToSISeg(String carbonTablePath,
       String indexTablePath) {
-    List<String> mainList =
-        
getListOfValidSlices(SegmentStatusManager.readLoadMetadata(carbonTablePath));
+    List<String> mainTableSegmentsList =
+        
getListOfValidSlices(SegmentStatusManager.readCarbonMetaData(carbonTablePath));
     List<String> indexList =
         
getListOfValidSlices(SegmentStatusManager.readLoadMetadata(indexTablePath));
-    Collections.sort(mainList);
+    Collections.sort(mainTableSegmentsList);
     Collections.sort(indexList);
-    if (indexList.size() != mainList.size()) {
+    if (indexList.size() != mainTableSegmentsList.size()) {
       return false;
     }
     for (int i = 0; i < indexList.size(); i++) {
-      if (!indexList.get(i).equalsIgnoreCase(mainList.get(i))) {
+      if (!indexList.get(i).equalsIgnoreCase(mainTableSegmentsList.get(i))) {
         return false;
       }
     }

Reply via email to