[CARBONDATA-1472] Optimize memory and fix nosort queries

1.Use UnsafeManager for dimension chunks as well to avoid leaks
2.Fix filters on nosort columns.
3.Optimize scanRDD

This closes #1346


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/887310fc
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/887310fc
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/887310fc

Branch: refs/heads/branch-1.2
Commit: 887310fc75e8c20c82929d2d92114887cecf44df
Parents: dde2f4c
Author: Ravindra Pesala <ravi.pes...@gmail.com>
Authored: Sun Sep 10 14:57:09 2017 +0530
Committer: Jacky Li <jacky.li...@qq.com>
Committed: Wed Sep 13 22:03:26 2017 +0800

----------------------------------------------------------------------
 .../core/constants/CarbonCommonConstants.java   |  4 ++
 .../UnsafeAbstractDimensionDataChunkStore.java  | 17 +++++---
 .../core/memory/MemoryAllocatorFactory.java     | 46 --------------------
 .../core/memory/UnsafeMemoryManager.java        | 21 ++++++---
 .../executor/impl/AbstractQueryExecutor.java    |  6 +--
 .../executer/RangeValueFilterExecuterImpl.java  | 10 +++--
 ...velRangeLessThanEqualFilterExecuterImpl.java |  8 +++-
 .../RowLevelRangeLessThanFiterExecuterImpl.java |  8 +++-
 .../carbondata/hadoop/AbstractRecordReader.java |  2 -
 .../carbondata/spark/rdd/CarbonScanRDD.scala    | 43 +++++++++++++-----
 10 files changed, 84 insertions(+), 81 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/887310fc/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
index 5a68f60..0348bd1 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
@@ -1371,6 +1371,10 @@ public final class CarbonCommonConstants {
 
   public static final String USE_DISTRIBUTED_DATAMAP_DEFAULT = "false";
 
+  public static final String CARBON_USE_BLOCKLET_DISTRIBUTION = 
"carbon.blocklet.distribution";
+
+  public static final String CARBON_USE_BLOCKLET_DISTRIBUTION_DEFAULT = "true";
+
   private CarbonCommonConstants() {
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/887310fc/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
index 704f2d3..22c2e16 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/datastore/chunk/store/impl/unsafe/UnsafeAbstractDimensionDataChunkStore.java
@@ -20,9 +20,11 @@ package 
org.apache.carbondata.core.datastore.chunk.store.impl.unsafe;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import 
org.apache.carbondata.core.datastore.chunk.store.DimensionDataChunkStore;
 import org.apache.carbondata.core.memory.CarbonUnsafe;
-import org.apache.carbondata.core.memory.MemoryAllocatorFactory;
 import org.apache.carbondata.core.memory.MemoryBlock;
+import org.apache.carbondata.core.memory.MemoryException;
+import org.apache.carbondata.core.memory.UnsafeMemoryManager;
 import org.apache.carbondata.core.scan.result.vector.CarbonColumnVector;
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo;
 
 /**
  * Responsibility is to store dimension data in memory. storage can be on heap
@@ -60,6 +62,8 @@ public abstract class UnsafeAbstractDimensionDataChunkStore 
implements Dimension
    */
   protected boolean isMemoryOccupied;
 
+  private final long taskId = 
ThreadLocalTaskInfo.getCarbonTaskInfo().getTaskId();
+
   /**
    * Constructor
    *
@@ -69,9 +73,12 @@ public abstract class UnsafeAbstractDimensionDataChunkStore 
implements Dimension
    */
   public UnsafeAbstractDimensionDataChunkStore(long totalSize, boolean 
isInvertedIdex,
       int numberOfRows) {
-    // allocating the data page
-    this.dataPageMemoryBlock =
-        
MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().allocate(totalSize);
+    try {
+      // allocating the data page
+      this.dataPageMemoryBlock = 
UnsafeMemoryManager.allocateMemoryWithRetry(taskId, totalSize);
+    } catch (MemoryException e) {
+      throw new RuntimeException(e);
+    }
     this.isExplicitSorted = isInvertedIdex;
   }
 
@@ -116,7 +123,7 @@ public abstract class UnsafeAbstractDimensionDataChunkStore 
implements Dimension
       return;
     }
     // free data page memory
-    
MemoryAllocatorFactory.INSATANCE.getMemoryAllocator().free(dataPageMemoryBlock);
+    UnsafeMemoryManager.INSTANCE.freeMemory(taskId, dataPageMemoryBlock);
     isMemoryReleased = true;
     this.dataPageMemoryBlock = null;
     this.isMemoryOccupied = false;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/887310fc/core/src/main/java/org/apache/carbondata/core/memory/MemoryAllocatorFactory.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/memory/MemoryAllocatorFactory.java
 
b/core/src/main/java/org/apache/carbondata/core/memory/MemoryAllocatorFactory.java
deleted file mode 100644
index e55af93..0000000
--- 
a/core/src/main/java/org/apache/carbondata/core/memory/MemoryAllocatorFactory.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.carbondata.core.memory;
-
-import org.apache.carbondata.core.constants.CarbonCommonConstants;
-import org.apache.carbondata.core.util.CarbonProperties;
-
-/**
- * Factory class to to get the memory allocator instance
- */
-public class MemoryAllocatorFactory {
-
-  private MemoryAllocator memoryAllocator;
-
-  public static final MemoryAllocatorFactory INSATANCE = new 
MemoryAllocatorFactory();
-
-  private MemoryAllocatorFactory() {
-    boolean offHeap = Boolean.parseBoolean(CarbonProperties.getInstance()
-        .getProperty(CarbonCommonConstants.USE_OFFHEAP_IN_QUERY_PROCSSING,
-            CarbonCommonConstants.USE_OFFHEAP_IN_QUERY_PROCSSING_DEFAULT));
-    if (offHeap) {
-      memoryAllocator = MemoryAllocator.UNSAFE;
-    } else {
-      memoryAllocator = MemoryAllocator.HEAP;
-    }
-  }
-
-  public MemoryAllocator getMemoryAllocator() {
-    return memoryAllocator;
-  }
-}

http://git-wip-us.apache.org/repos/asf/carbondata/blob/887310fc/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java 
b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
index 06f907d..4222e14 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/memory/UnsafeMemoryManager.java
@@ -96,9 +96,11 @@ public class UnsafeMemoryManager {
         taskIdToMemoryBlockMap.put(taskId, listOfMemoryBlock);
       }
       listOfMemoryBlock.add(allocate);
-      LOGGER.info("Memory block (" + allocate + ") is created with size " + 
allocate.size()
-          + ". Total memory used " + memoryUsed + "Bytes, left " + 
(totalMemory - memoryUsed)
-          + "Bytes");
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug("Memory block (" + allocate + ") is created with size " + 
allocate.size()
+            + ". Total memory used " + memoryUsed + "Bytes, left " + 
(totalMemory - memoryUsed)
+            + "Bytes");
+      }
       return allocate;
     }
     return null;
@@ -112,9 +114,11 @@ public class UnsafeMemoryManager {
       allocator.free(memoryBlock);
       memoryUsed -= memoryBlock.size();
       memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
-      LOGGER.info(
-          "Freeing memory of size: " + memoryBlock.size() + "available memory: 
 " + (totalMemory
-              - memoryUsed));
+      if (LOGGER.isDebugEnabled()) {
+        LOGGER.debug(
+            "Freeing memory of size: " + memoryBlock.size() + "available 
memory:  " + (totalMemory
+                - memoryUsed));
+      }
     }
   }
 
@@ -140,6 +144,8 @@ public class UnsafeMemoryManager {
           "Freeing memory of size: " + occuppiedMemory + ": Current available 
memory is: " + (
               totalMemory - memoryUsed));
     }
+    LOGGER.info("Total memory used after task " + taskId + " is " + memoryUsed
+        + " Current tasks running now are : " + 
taskIdToMemoryBlockMap.keySet());
   }
 
   public synchronized boolean isMemoryAvailable() {
@@ -160,6 +166,7 @@ public class UnsafeMemoryManager {
       baseBlock = INSTANCE.allocateMemory(taskId, size);
       if (baseBlock == null) {
         try {
+          LOGGER.info("Memory is not available, retry after 500 millis");
           Thread.sleep(500);
         } catch (InterruptedException e) {
           throw new MemoryException(e);
@@ -170,6 +177,8 @@ public class UnsafeMemoryManager {
       tries++;
     }
     if (baseBlock == null) {
+      LOGGER.error(" Memory Used : " + INSTANCE.memoryUsed + " Tasks running : 
"
+          + taskIdToMemoryBlockMap.keySet());
       throw new MemoryException("Not enough memory");
     }
     return baseBlock;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/887310fc/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
index f159744..e8e7bfb 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/executor/impl/AbstractQueryExecutor.java
@@ -155,10 +155,10 @@ public abstract class AbstractQueryExecutor<E> implements 
QueryExecutor<E> {
               queryModel.getAbsoluteTableIdentifier());
       cache.removeTableBlocksIfHorizontalCompactionDone(queryModel);
       queryProperties.dataBlocks = cache.getAll(tableBlockUniqueIdentifiers);
-      queryStatistic
-          .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, 
System.currentTimeMillis());
-      queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
     }
+    queryStatistic
+        .addStatistics(QueryStatisticsConstants.LOAD_BLOCKS_EXECUTOR, 
System.currentTimeMillis());
+    queryProperties.queryStatisticsRecorder.recordStatistics(queryStatistic);
     // calculating the total number of aggeragted columns
     int aggTypeCount = queryModel.getQueryMeasures().size();
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/887310fc/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
index c2e077e..63472f9 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RangeValueFilterExecuterImpl.java
@@ -552,11 +552,15 @@ public class RangeValueFilterExecuterImpl extends 
ValueBasedFilterExecuterImpl {
       if 
(dimColEvaluatorInfo.getDimension().hasEncoding(Encoding.DIRECT_DICTIONARY)) {
         DirectDictionaryGenerator directDictionaryGenerator = 
DirectDictionaryKeyGeneratorFactory
             
.getDirectDictionaryGenerator(dimColEvaluatorInfo.getDimension().getDataType());
-        int key = directDictionaryGenerator.generateDirectSurrogateKey(null) + 
1;
+        int key = directDictionaryGenerator.generateDirectSurrogateKey(null);
         CarbonDimension currentBlockDimension =
             segmentProperties.getDimensions().get(dimensionBlocksIndex);
-        defaultValue = FilterUtil.getMaskKey(key, currentBlockDimension,
-            this.segmentProperties.getSortColumnsGenerator());
+        if (currentBlockDimension.isSortColumn()) {
+          defaultValue = FilterUtil.getMaskKey(key, currentBlockDimension,
+              this.segmentProperties.getSortColumnsGenerator());
+        } else {
+          defaultValue = ByteUtil.toBytes(key);
+        }
       } else {
         defaultValue = CarbonCommonConstants.MEMBER_DEFAULT_VAL_ARRAY;
       }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/887310fc/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
index 63c9395..50231d6 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanEqualFilterExecuterImpl.java
@@ -268,8 +268,12 @@ public class RowLevelRangeLessThanEqualFilterExecuterImpl 
extends RowLevelFilter
       int key = directDictionaryGenerator.generateDirectSurrogateKey(null) + 1;
       CarbonDimension currentBlockDimension =
           segmentProperties.getDimensions().get(dimensionBlocksIndex[0]);
-      defaultValue = FilterUtil.getMaskKey(key, currentBlockDimension,
-          this.segmentProperties.getSortColumnsGenerator());
+      if (currentBlockDimension.isSortColumn()) {
+        defaultValue = FilterUtil.getMaskKey(key, currentBlockDimension,
+            this.segmentProperties.getSortColumnsGenerator());
+      } else {
+        defaultValue = ByteUtil.toBytes(key);
+      }
     }
     BitSet bitSet = null;
     if (dimensionColumnDataChunk.isExplicitSorted()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/887310fc/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
index 86ded59..1972f8e 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/scan/filter/executer/RowLevelRangeLessThanFiterExecuterImpl.java
@@ -270,8 +270,12 @@ public class RowLevelRangeLessThanFiterExecuterImpl 
extends RowLevelFilterExecut
       int key = directDictionaryGenerator.generateDirectSurrogateKey(null) + 1;
       CarbonDimension currentBlockDimension =
           segmentProperties.getDimensions().get(dimensionBlocksIndex[0]);
-      defaultValue = FilterUtil.getMaskKey(key, currentBlockDimension,
-          this.segmentProperties.getSortColumnsGenerator());
+      if (currentBlockDimension.isSortColumn()) {
+        defaultValue = FilterUtil.getMaskKey(key, currentBlockDimension,
+            this.segmentProperties.getSortColumnsGenerator());
+      } else {
+        defaultValue = ByteUtil.toBytes(key);
+      }
     }
     BitSet bitSet = null;
     if (dimensionColumnDataChunk.isExplicitSorted()) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/887310fc/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java
----------------------------------------------------------------------
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java
index e571ccf..62a97f9 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/AbstractRecordReader.java
@@ -39,7 +39,5 @@ public abstract class AbstractRecordReader<T> extends 
RecordReader<Void, T> {
     QueryStatistic queryStatistic = new QueryStatistic();
     queryStatistic.addCountStatistic(QueryStatisticsConstants.RESULT_SIZE, 
recordCount);
     recorder.recordStatistics(queryStatistic);
-    // print executor query statistics for each task_id
-    recorder.logStatisticsAsTableExecutor();
   }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/887310fc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
index 0035c44..1c08307 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonScanRDD.scala
@@ -21,6 +21,7 @@ import java.text.SimpleDateFormat
 import java.util.{ArrayList, Date, List}
 
 import scala.collection.JavaConverters._
+import scala.util.Random
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.mapreduce._
@@ -54,7 +55,7 @@ class CarbonScanRDD(
     columnProjection: CarbonProjection,
     filterExpression: Expression,
     identifier: AbsoluteTableIdentifier,
-    serializedTableInfo: Array[Byte],
+    @transient serializedTableInfo: Array[Byte],
     @transient tableInfo: TableInfo, inputMetricsStats: InitInputMetrics)
   extends CarbonRDDWithTableInfo[InternalRow](sc, Nil, serializedTableInfo) {
 
@@ -147,13 +148,30 @@ class CarbonScanRDD(
         }
         noOfNodes = nodeBlockMapping.size
       } else {
-        splits.asScala.zipWithIndex.foreach { splitWithIndex =>
-          val multiBlockSplit =
-            new CarbonMultiBlockSplit(identifier,
-              Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
-              splitWithIndex._1.getLocations)
-          val partition = new CarbonSparkPartition(id, splitWithIndex._2, 
multiBlockSplit)
-          result.add(partition)
+        if (CarbonProperties.getInstance()
+          .getProperty(CarbonCommonConstants.CARBON_USE_BLOCKLET_DISTRIBUTION,
+            
CarbonCommonConstants.CARBON_USE_BLOCKLET_DISTRIBUTION_DEFAULT).toBoolean) {
+          // Use blocklet distribution
+          // Randomize the blocklets for better shuffling
+          Random.shuffle(splits.asScala).zipWithIndex.foreach { splitWithIndex 
=>
+            val multiBlockSplit =
+              new CarbonMultiBlockSplit(identifier,
+                Seq(splitWithIndex._1.asInstanceOf[CarbonInputSplit]).asJava,
+                splitWithIndex._1.getLocations)
+            val partition = new CarbonSparkPartition(id, splitWithIndex._2, 
multiBlockSplit)
+            result.add(partition)
+          }
+        } else {
+          // Use block distribution
+          splits.asScala.map(_.asInstanceOf[CarbonInputSplit]).
+            groupBy(f => f.getBlockPath).values.zipWithIndex.foreach { 
splitWithIndex =>
+            val multiBlockSplit =
+              new CarbonMultiBlockSplit(identifier,
+                splitWithIndex._1.asJava,
+                splitWithIndex._1.flatMap(f => 
f.getLocations).distinct.toArray)
+            val partition = new CarbonSparkPartition(id, splitWithIndex._2, 
multiBlockSplit)
+            result.add(partition)
+          }
         }
       }
 
@@ -176,7 +194,7 @@ class CarbonScanRDD(
   }
 
   override def internalCompute(split: Partition, context: TaskContext): 
Iterator[InternalRow] = {
-
+    val queryStartTime = System.currentTimeMillis
     val carbonPropertiesFilePath = 
System.getProperty("carbon.properties.filepath", null)
     if (null == carbonPropertiesFilePath) {
       System.setProperty("carbon.properties.filepath",
@@ -209,16 +227,15 @@ class CarbonScanRDD(
       }
 
       reader.initialize(inputSplit, attemptContext)
-      val queryStartTime = System.currentTimeMillis
 
       new Iterator[Any] {
         private var havePair = false
         private var finished = false
 
         context.addTaskCompletionListener { context =>
-          logStatistics(queryStartTime, model.getStatisticsRecorder)
           reader.close()
-        close()
+          close()
+          logStatistics(queryStartTime, model.getStatisticsRecorder)
         }
 
         override def hasNext: Boolean = {
@@ -288,6 +305,8 @@ class CarbonScanRDD(
     
queryStatistic.addFixedTimeStatistic(QueryStatisticsConstants.EXECUTOR_PART,
       System.currentTimeMillis - queryStartTime)
     recorder.recordStatistics(queryStatistic)
+    // print executor query statistics for each task_id
+    recorder.logStatisticsAsTableExecutor()
   }
 
   /**

Reply via email to