Repository: incubator-carbondata
Updated Branches:
  refs/heads/master 299f93ba6 -> 38c5cd656


Added stats like rows processed in each step. And also fixes unsafe sort enable 
issue.

Fixed style

Rebased

Fixed style

Fixed comments

Fixed testcase

Reverted wrong commit of example

Updated pom to use default no kettle flow in pom

Updated pom to remove with-kettle profile


Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/30f575f4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/30f575f4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/30f575f4

Branch: refs/heads/master
Commit: 30f575f4b009847e6eab9ff7d451419d1f9b0d46
Parents: 299f93b
Author: ravipesala <ravi.pes...@gmail.com>
Authored: Wed Jan 25 15:31:56 2017 +0530
Committer: jackylk <jacky.li...@huawei.com>
Committed: Fri Jan 27 23:49:59 2017 +0800

----------------------------------------------------------------------
 .../carbondata/common/logging/LogService.java   |  6 +++
 .../examples/CarbonSessionExample.scala         | 19 ++-------
 integration/spark-common-test/pom.xml           |  2 +-
 integration/spark/pom.xml                       |  2 +-
 integration/spark2/pom.xml                      |  2 +-
 .../execution/CarbonLateDecodeStrategy.scala    |  9 +++--
 pom.xml                                         |  8 +---
 .../newflow/AbstractDataLoadProcessorStep.java  | 42 +++++++++++++++++++-
 .../processing/newflow/row/CarbonRowBatch.java  |  3 ++
 .../sort/impl/ParallelReadMergeSorterImpl.java  | 17 +++++---
 ...arallelReadMergeSorterWithBucketingImpl.java | 19 +++++----
 .../impl/UnsafeParallelReadMergeSorterImpl.java | 17 +++++---
 .../sort/unsafe/UnsafeMemoryManager.java        |  6 +--
 .../steps/DataConverterProcessorStepImpl.java   | 13 ++++--
 ...ConverterProcessorWithBucketingStepImpl.java | 13 ++++--
 .../steps/DataWriterProcessorStepImpl.java      | 12 +++---
 .../newflow/steps/DummyClassForTest.java        |  4 ++
 .../newflow/steps/InputProcessorStepImpl.java   | 23 ++++++++---
 .../newflow/steps/SortProcessorStepImpl.java    | 16 +++++---
 19 files changed, 158 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/common/src/main/java/org/apache/carbondata/common/logging/LogService.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/carbondata/common/logging/LogService.java 
b/common/src/main/java/org/apache/carbondata/common/logging/LogService.java
index 4ba8f0a..ee02aba 100644
--- a/common/src/main/java/org/apache/carbondata/common/logging/LogService.java
+++ b/common/src/main/java/org/apache/carbondata/common/logging/LogService.java
@@ -42,4 +42,10 @@ public interface LogService {
    * @param message statistic message
    */
   void statistic(String message);
+
+  boolean isDebugEnabled();
+
+  boolean isWarnEnabled();
+
+  boolean isInfoEnabled();
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
----------------------------------------------------------------------
diff --git 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
index 0d9c43f..1d485cd 100644
--- 
a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
+++ 
b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala
@@ -19,7 +19,6 @@ package org.apache.carbondata.examples
 
 import java.io.File
 
-import org.apache.commons.io.FileUtils
 import org.apache.spark.sql.SparkSession
 
 import org.apache.carbondata.core.constants.CarbonCommonConstants
@@ -32,19 +31,10 @@ object CarbonSessionExample {
                             + "../../../..").getCanonicalPath
     val storeLocation = s"$rootPath/examples/spark2/target/store"
     val warehouse = s"$rootPath/examples/spark2/target/warehouse"
-    val metastoredb = s"$rootPath/examples/spark2/target/metastore_db"
-
-    // clean data folder
-    if (true) {
-      val clean = (path: String) => FileUtils.deleteDirectory(new File(path))
-      clean(storeLocation)
-      clean(warehouse)
-      clean(metastoredb)
-    }
+    val metastoredb = s"$rootPath/examples/spark2/target"
 
     CarbonProperties.getInstance()
       .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins")
-      .addProperty("carbon.storelocation", storeLocation)
       .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd")
 
     import org.apache.spark.sql.CarbonSession._
@@ -52,12 +42,9 @@ object CarbonSessionExample {
     val spark = SparkSession
       .builder()
       .master("local")
-      .appName("CarbonExample")
-      .enableHiveSupport()
+      .appName("CarbonSessionExample")
       .config("spark.sql.warehouse.dir", warehouse)
-      .config("javax.jdo.option.ConnectionURL",
-    s"jdbc:derby:;databaseName=$metastoredb;create=true")
-      .getOrCreateCarbonSession()
+      .getOrCreateCarbonSession(storeLocation, metastoredb)
 
     spark.sparkContext.setLogLevel("WARN")
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/integration/spark-common-test/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark-common-test/pom.xml 
b/integration/spark-common-test/pom.xml
index 03144ef..29f0cad 100644
--- a/integration/spark-common-test/pom.xml
+++ b/integration/spark-common-test/pom.xml
@@ -147,7 +147,7 @@
           </environmentVariables>
           <systemProperties>
             <java.awt.headless>true</java.awt.headless>
-            <use.kettle>${use.kettle}</use.kettle>
+            <use_kettle>${use.kettle}</use_kettle>
           </systemProperties>
         </configuration>
         <executions>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/integration/spark/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark/pom.xml b/integration/spark/pom.xml
index ad921c0..7693bde 100644
--- a/integration/spark/pom.xml
+++ b/integration/spark/pom.xml
@@ -175,7 +175,7 @@
           </environmentVariables>
           <systemProperties>
             <java.awt.headless>true</java.awt.headless>
-            <use.kettle>${use.kettle}</use.kettle>
+            <use_kettle>${use.kettle}</use_kettle>
           </systemProperties>
         </configuration>
         <executions>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/integration/spark2/pom.xml
----------------------------------------------------------------------
diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml
index 2c125c5..ca8f33c 100644
--- a/integration/spark2/pom.xml
+++ b/integration/spark2/pom.xml
@@ -156,7 +156,7 @@
           </environmentVariables>
           <systemProperties>
             <java.awt.headless>true</java.awt.headless>
-            <use.kettle>${use.kettle}</use.kettle>
+            <use_kettle>${use.kettle}</use_kettle>
           </systemProperties>
         </configuration>
         <executions>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
index 3e0e9c0..549cc1e 100644
--- 
a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
+++ 
b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala
@@ -36,6 +36,7 @@ import org.apache.spark.sql.types.{AtomicType, IntegerType}
 import org.apache.carbondata.core.constants.CarbonCommonConstants
 import org.apache.carbondata.core.metadata.schema.BucketingInfo
 import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.util.CarbonProperties
 import org.apache.carbondata.spark.CarbonAliasDecoderRelation
 import org.apache.carbondata.spark.rdd.CarbonScanRDD
 import org.apache.carbondata.spark.util.CarbonScalaUtil
@@ -481,15 +482,17 @@ private[sql] class CarbonLateDecodeStrategy extends 
SparkStrategy {
   }
 
   def supportBatchedDataSource(sqlContext: SQLContext, cols: Seq[Attribute]): 
Boolean = {
-    val enableReader = {
+    val vectorizedReader = {
       if 
(sqlContext.sparkSession.conf.contains(CarbonCommonConstants.ENABLE_VECTOR_READER))
 {
         
sqlContext.sparkSession.conf.get(CarbonCommonConstants.ENABLE_VECTOR_READER)
+      } else if 
(System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) {
+        System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER)
       } else {
-        System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
+        
CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER,
           CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT)
       }
     }
-    sqlContext.conf.wholeStageEnabled && enableReader.toBoolean &&
+    sqlContext.conf.wholeStageEnabled && vectorizedReader.toBoolean &&
       cols.forall(_.dataType.isInstanceOf[AtomicType])
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2bfa539..bb50808 100644
--- a/pom.xml
+++ b/pom.xml
@@ -106,7 +106,7 @@
     <snappy.version>1.1.2.6</snappy.version>
     <hadoop.version>2.2.0</hadoop.version>
     <kettle.version>4.4.0-stable</kettle.version>
-    <use.kettle>true</use.kettle>
+    <use.kettle>false</use.kettle>
     <hadoop.deps.scope>compile</hadoop.deps.scope>
     <spark.deps.scope>compile</spark.deps.scope>
     <scala.deps.scope>compile</scala.deps.scope>
@@ -408,12 +408,6 @@
       <id>include-all</id>
     </profile>
     <profile>
-      <id>no-kettle</id>
-      <properties>
-        <use.kettle>false</use.kettle>
-      </properties>
-    </profile>
-    <profile>
       <id>rat</id>
       <build>
         <plugins>

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
index 9961662..18d6aeb 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java
@@ -19,8 +19,11 @@ package org.apache.carbondata.processing.newflow;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.carbondata.common.CarbonIterator;
+import org.apache.carbondata.common.logging.LogService;
+import org.apache.carbondata.common.logging.LogServiceFactory;
 import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.newflow.row.CarbonRow;
 import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
@@ -36,14 +39,40 @@ import 
org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
  */
 public abstract class AbstractDataLoadProcessorStep {
 
+  private static final LogService LOGGER =
+      
LogServiceFactory.getLogService(AbstractDataLoadProcessorStep.class.getName());
+
   protected CarbonDataLoadConfiguration configuration;
 
   protected AbstractDataLoadProcessorStep child;
 
+  protected AtomicLong rowCounter;
+
+  protected boolean closed;
+
   public AbstractDataLoadProcessorStep(CarbonDataLoadConfiguration 
configuration,
       AbstractDataLoadProcessorStep child) {
     this.configuration = configuration;
     this.child = child;
+    this.rowCounter = new AtomicLong();
+    this.closed = false;
+
+    if (LOGGER.isInfoEnabled()) {
+      // This thread prints the rows processed in each step for every 10 
seconds.
+      new Thread() {
+        @Override public void run() {
+          while (!closed) {
+            try {
+              LOGGER.info("Rows processed in step " + getStepName() + " : " + 
rowCounter.get());
+              Thread.sleep(10000);
+            } catch (InterruptedException e) {
+              //ignore
+              LOGGER.error(e.getMessage());
+            }
+          }
+        }
+      }.start();
+    }
   }
 
   /**
@@ -115,14 +144,23 @@ public abstract class AbstractDataLoadProcessorStep {
    */
   protected abstract CarbonRow processRow(CarbonRow row);
 
+  /**
+   * Get the step name for logging purpose.
+   * @return Step name
+   */
+  protected abstract String getStepName();
+
 
   /**
    * Close all resources.This method is called after execute() is finished.
    * It will be called in both success and failure cases.
    */
   public void close() {
-    if (child != null) {
-      child.close();
+    if (!closed) {
+      closed = true;
+      if (child != null) {
+        child.close();
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
index 1ed5138..941b51d 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java
@@ -36,5 +36,8 @@ public class CarbonRowBatch {
     return rowBatch.iterator();
   }
 
+  public int getSize() {
+    return rowBatch.size();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
index 59697a1..16c5122 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java
@@ -22,6 +22,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
@@ -29,7 +30,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.newflow.DataField;
 import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.newflow.row.CarbonRow;
 import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
@@ -60,7 +60,10 @@ public class ParallelReadMergeSorterImpl implements Sorter {
 
   private SingleThreadFinalSortFilesMerger finalMerger;
 
-  public ParallelReadMergeSorterImpl(DataField[] inputDataFields) {
+  private AtomicLong rowCounter;
+
+  public ParallelReadMergeSorterImpl(AtomicLong rowCounter) {
+    this.rowCounter = rowCounter;
   }
 
   @Override
@@ -98,7 +101,7 @@ public class ParallelReadMergeSorterImpl implements Sorter {
     try {
       for (int i = 0; i < iterators.length; i++) {
         executorService.submit(
-            new SortIteratorThread(iterators[i], sortDataRow, sortParameters, 
batchSize));
+            new SortIteratorThread(iterators[i], sortDataRow, batchSize, 
rowCounter));
       }
       executorService.shutdown();
       executorService.awaitTermination(2, TimeUnit.DAYS);
@@ -182,11 +185,14 @@ public class ParallelReadMergeSorterImpl implements 
Sorter {
 
     private Object[][] buffer;
 
-    public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows 
sortDataRows,
-        SortParameters parameters, int batchSize) {
+    private AtomicLong rowCounter;
+
+    public SortIteratorThread(Iterator<CarbonRowBatch> iterator,
+        SortDataRows sortDataRows, int batchSize, AtomicLong rowCounter) {
       this.iterator = iterator;
       this.sortDataRows = sortDataRows;
       this.buffer = new Object[batchSize][];
+      this.rowCounter = rowCounter;
     }
 
     @Override
@@ -204,6 +210,7 @@ public class ParallelReadMergeSorterImpl implements Sorter {
           }
           if (i > 0) {
             sortDataRows.addRowBatch(buffer, i);
+            rowCounter.getAndAdd(i);
           }
         }
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
index 3b43b46..245302f 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java
@@ -22,6 +22,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
@@ -30,7 +31,6 @@ import 
org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.metadata.schema.BucketingInfo;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.newflow.DataField;
 import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.newflow.row.CarbonRow;
 import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
@@ -63,13 +63,13 @@ public class ParallelReadMergeSorterWithBucketingImpl 
implements Sorter {
 
   private BucketingInfo bucketingInfo;
 
-  private DataField[] inputDataFields;
-
   private int sortBufferSize;
 
-  public ParallelReadMergeSorterWithBucketingImpl(DataField[] inputDataFields,
+  private AtomicLong rowCounter;
+
+  public ParallelReadMergeSorterWithBucketingImpl(AtomicLong rowCounter,
       BucketingInfo bucketingInfo) {
-    this.inputDataFields = inputDataFields;
+    this.rowCounter = rowCounter;
     this.bucketingInfo = bucketingInfo;
   }
 
@@ -103,7 +103,7 @@ public class ParallelReadMergeSorterWithBucketingImpl 
implements Sorter {
     final int batchSize = CarbonProperties.getInstance().getBatchSize();
     try {
       for (int i = 0; i < iterators.length; i++) {
-        executorService.submit(new SortIteratorThread(iterators[i], 
sortDataRows));
+        executorService.submit(new SortIteratorThread(iterators[i], 
sortDataRows, rowCounter));
       }
       executorService.shutdown();
       executorService.awaitTermination(2, TimeUnit.DAYS);
@@ -196,9 +196,13 @@ public class ParallelReadMergeSorterWithBucketingImpl 
implements Sorter {
 
     private SortDataRows[] sortDataRows;
 
-    public SortIteratorThread(Iterator<CarbonRowBatch> iterator, 
SortDataRows[] sortDataRows) {
+    private AtomicLong rowCounter;
+
+    public SortIteratorThread(Iterator<CarbonRowBatch> iterator, 
SortDataRows[] sortDataRows,
+        AtomicLong rowCounter) {
       this.iterator = iterator;
       this.sortDataRows = sortDataRows;
+      this.rowCounter = rowCounter;
     }
 
     @Override public Void call() throws CarbonDataLoadingException {
@@ -213,6 +217,7 @@ public class ParallelReadMergeSorterWithBucketingImpl 
implements Sorter {
               SortDataRows sortDataRow = sortDataRows[row.bucketNumber];
               synchronized (sortDataRow) {
                 sortDataRow.addRow(row.getData());
+                rowCounter.getAndAdd(1);
               }
             }
           }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
index 4e4763f..d40b763 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java
@@ -23,6 +23,7 @@ import java.util.concurrent.Callable;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.common.logging.LogService;
@@ -30,7 +31,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory;
 import org.apache.carbondata.core.constants.CarbonCommonConstants;
 import org.apache.carbondata.core.util.CarbonProperties;
 import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory;
-import org.apache.carbondata.processing.newflow.DataField;
 import 
org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException;
 import org.apache.carbondata.processing.newflow.row.CarbonRow;
 import org.apache.carbondata.processing.newflow.row.CarbonRowBatch;
@@ -62,7 +62,10 @@ public class UnsafeParallelReadMergeSorterImpl implements 
Sorter {
 
   private UnsafeSingleThreadFinalSortFilesMerger finalMerger;
 
-  public UnsafeParallelReadMergeSorterImpl(DataField[] inputDataFields) {
+  private AtomicLong rowCounter;
+
+  public UnsafeParallelReadMergeSorterImpl(AtomicLong rowCounter) {
+    this.rowCounter = rowCounter;
   }
 
   @Override public void initialize(SortParameters sortParameters) {
@@ -93,7 +96,7 @@ public class UnsafeParallelReadMergeSorterImpl implements 
Sorter {
     try {
       for (int i = 0; i < iterators.length; i++) {
         executorService
-            .submit(new SortIteratorThread(iterators[i], sortDataRow, 
sortParameters, batchSize));
+            .submit(new SortIteratorThread(iterators[i], sortDataRow, 
batchSize, rowCounter));
       }
       executorService.shutdown();
       executorService.awaitTermination(2, TimeUnit.DAYS);
@@ -177,11 +180,14 @@ public class UnsafeParallelReadMergeSorterImpl implements 
Sorter {
 
     private Object[][] buffer;
 
-    public SortIteratorThread(Iterator<CarbonRowBatch> iterator, 
UnsafeSortDataRows sortDataRows,
-        SortParameters parameters, int batchSize) {
+    private AtomicLong rowCounter;
+
+    public SortIteratorThread(Iterator<CarbonRowBatch> iterator,
+        UnsafeSortDataRows sortDataRows, int batchSize, AtomicLong rowCounter) 
{
       this.iterator = iterator;
       this.sortDataRows = sortDataRows;
       this.buffer = new Object[batchSize][];
+      this.rowCounter = rowCounter;
     }
 
     @Override public Void call() throws CarbonDataLoadingException {
@@ -198,6 +204,7 @@ public class UnsafeParallelReadMergeSorterImpl implements 
Sorter {
           }
           if (i > 0) {
             sortDataRows.addRowBatch(buffer, i);
+            rowCounter.getAndAdd(i);
           }
         }
       } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
index c7528ab..7e7bb7e 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java
@@ -68,14 +68,14 @@ public class UnsafeMemoryManager {
     this.totalMemory = totalMemory;
     this.allocator = allocator;
     minimumMemory = (long) (totalMemory * ((double) 10 / 100));
-    LOGGER.audit("Memory manager is created with size " + totalMemory + " with 
" + allocator
+    LOGGER.info("Memory manager is created with size " + totalMemory + " with 
" + allocator
         + " and minimum reserve memory " + minimumMemory);
   }
   public synchronized MemoryBlock allocateMemory(long memoryRequested) {
     if (memoryUsed + memoryRequested <= totalMemory) {
       MemoryBlock allocate = allocator.allocate(memoryRequested);
       memoryUsed += allocate.size();
-      LOGGER.audit("Memory block is created with size "  + allocate.size() +
+      LOGGER.info("Memory block is created with size "  + allocate.size() +
           " Total memory used " + memoryUsed + " memory left " + 
(getAvailableMemory()));
       return allocate;
     }
@@ -86,7 +86,7 @@ public class UnsafeMemoryManager {
     allocator.free(memoryBlock);
     memoryUsed -= memoryBlock.size();
     memoryUsed = memoryUsed < 0 ? 0 : memoryUsed;
-    LOGGER.audit(
+    LOGGER.info(
         "Memory released, memory used " + memoryUsed + " memory left " + 
(getAvailableMemory()));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
index 45105c6..1a6535f 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java
@@ -94,6 +94,7 @@ public class DataConverterProcessorStepImpl extends 
AbstractDataLoadProcessorSte
     while (batchIterator.hasNext()) {
       newBatch.addRow(localConverter.convert(batchIterator.next()));
     }
+    rowCounter.getAndAdd(newBatch.getSize());
     return newBatch;
   }
 
@@ -152,9 +153,15 @@ public class DataConverterProcessorStepImpl extends 
AbstractDataLoadProcessorSte
 
   @Override
   public void close() {
-    super.close();
-    if (converter != null) {
-      converter.finish();
+    if (!closed) {
+      super.close();
+      if (converter != null) {
+        converter.finish();
+      }
     }
   }
+
+  @Override protected String getStepName() {
+    return "Data Converter";
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
index 5f06730..0223b04 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java
@@ -121,6 +121,7 @@ public class DataConverterProcessorWithBucketingStepImpl 
extends AbstractDataLoa
       convertRow.bucketNumber = (short) 
partitioner.getPartition(next.getData());
       newBatch.addRow(convertRow);
     }
+    rowCounter.getAndAdd(newBatch.getSize());
     return newBatch;
   }
 
@@ -179,9 +180,15 @@ public class DataConverterProcessorWithBucketingStepImpl 
extends AbstractDataLoa
 
   @Override
   public void close() {
-    super.close();
-    if (converter != null) {
-      converter.finish();
+    if (!closed) {
+      super.close();
+      if (converter != null) {
+        converter.finish();
+      }
     }
   }
+
+  @Override protected String getStepName() {
+    return "Data Converter with Bucketing";
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
index b9544d0..710cc4f 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java
@@ -60,8 +60,6 @@ public class DataWriterProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
 
   private long readCounter;
 
-  private long writeCounter;
-
   private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex();
 
   private int noDimByteArrayIndex = 
IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex();
@@ -141,8 +139,8 @@ public class DataWriterProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
     return null;
   }
 
-  @Override public void close() {
-
+  @Override protected String getStepName() {
+    return "Data Writer";
   }
 
   private void finish(String tableName, CarbonFactHandler dataHandler) {
@@ -154,9 +152,9 @@ public class DataWriterProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
     LOGGER.info("Record Processed For table: " + tableName);
     String logMessage =
         "Finished Carbon DataWriterProcessorStepImpl: Read: " + readCounter + 
": Write: "
-            + writeCounter;
+            + rowCounter.get();
     LOGGER.info(logMessage);
-    
CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(writeCounter);
+    
CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
     processingComplete(dataHandler);
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
         .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(),
@@ -210,11 +208,11 @@ public class DataWriterProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
 
         outputRow[outputRow.length - 1] = 
keyGenerator.generateKey(highCardExcludedRows);
         dataHandler.addDataToStore(outputRow);
-        writeCounter++;
       }
     } catch (Exception e) {
       throw new CarbonDataLoadingException("unable to generate the mdkey", e);
     }
+    rowCounter.getAndAdd(batch.getSize());
   }
 
   @Override protected CarbonRow processRow(CarbonRow row) {

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
index 8130cf7..a7d8e7f 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java
@@ -49,6 +49,10 @@ public class DummyClassForTest extends 
AbstractDataLoadProcessorStep {
 
   }
 
+  @Override protected String getStepName() {
+    return "Dummy";
+  }
+
   @Override public Iterator<CarbonRowBatch>[] execute() throws 
CarbonDataLoadingException {
     Iterator<CarbonRowBatch>[] iterators = child.execute();
     this.executorService = Executors.newFixedThreadPool(iterators.length);

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
index c0bf50a..0097690 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java
@@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
+import java.util.concurrent.atomic.AtomicLong;
 
 import org.apache.carbondata.common.CarbonIterator;
 import org.apache.carbondata.core.util.CarbonProperties;
@@ -72,7 +73,7 @@ public class InputProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
     for (int i = 0; i < outIterators.length; i++) {
       outIterators[i] =
           new InputProcessorIterator(readerIterators[i], rowParser, batchSize,
-              configuration.isPreFetch(), executorService);
+              configuration.isPreFetch(), executorService, rowCounter);
     }
     return outIterators;
   }
@@ -104,12 +105,19 @@ public class InputProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
   }
 
   @Override public void close() {
-    executorService.shutdown();
-    for (CarbonIterator inputIterator : inputIterators) {
-      inputIterator.close();
+    if (!closed) {
+      super.close();
+      executorService.shutdown();
+      for (CarbonIterator inputIterator : inputIterators) {
+        inputIterator.close();
+      }
     }
   }
 
+  @Override protected String getStepName() {
+    return "Input Processor";
+  }
+
   /**
    * This iterator wraps the list of iterators and it starts iterating the each
    * iterator of the list one by one. It also parse the data while iterating 
it.
@@ -136,8 +144,11 @@ public class InputProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
 
     private boolean preFetch;
 
+    private AtomicLong rowCounter;
+
     public InputProcessorIterator(List<CarbonIterator<Object[]>> 
inputIterators,
-        RowParser rowParser, int batchSize, boolean preFetch, ExecutorService 
executorService) {
+        RowParser rowParser, int batchSize, boolean preFetch, ExecutorService 
executorService,
+        AtomicLong rowCounter) {
       this.inputIterators = inputIterators;
       this.batchSize = batchSize;
       this.rowParser = rowParser;
@@ -145,6 +156,7 @@ public class InputProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
       // Get the first iterator from the list.
       currentIterator = inputIterators.get(counter++);
       this.executorService = executorService;
+      this.rowCounter = rowCounter;
       this.preFetch = preFetch;
       this.nextBatch = false;
       this.firstTime = true;
@@ -222,6 +234,7 @@ public class InputProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
         carbonRowBatch.addRow(new 
CarbonRow(rowParser.parseRow(currentIterator.next())));
         count++;
       }
+      rowCounter.getAndAdd(carbonRowBatch.getSize());
       return carbonRowBatch;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
index dc048b9..bd4b0e6 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java
@@ -59,15 +59,13 @@ public class SortProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
         .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT,
             CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT));
     if (offheapsort) {
-      sorter = new UnsafeParallelReadMergeSorterImpl(child.getOutput());
+      sorter = new UnsafeParallelReadMergeSorterImpl(rowCounter);
     } else {
-      sorter = new ParallelReadMergeSorterImpl(child.getOutput());
+      sorter = new ParallelReadMergeSorterImpl(rowCounter);
     }
     if (configuration.getBucketingInfo() != null) {
-      sorter = new ParallelReadMergeSorterWithBucketingImpl(child.getOutput(),
+      sorter = new ParallelReadMergeSorterWithBucketingImpl(rowCounter,
           configuration.getBucketingInfo());
-    } else {
-      sorter = new ParallelReadMergeSorterImpl(child.getOutput());
     }
     sorter.initialize(sortParameters);
   }
@@ -87,7 +85,13 @@ public class SortProcessorStepImpl extends 
AbstractDataLoadProcessorStep {
 
   @Override
   public void close() {
-    sorter.close();
+    if (!closed) {
+      super.close();
+      sorter.close();
+    }
   }
 
+  @Override protected String getStepName() {
+    return "Sort Processor";
+  }
 }


Reply via email to