Repository: incubator-carbondata Updated Branches: refs/heads/master 299f93ba6 -> 38c5cd656
Added stats like rows processed in each step. And also fixes unsafe sort enable issue. Fixed style Rebased Fixed style Fixed comments Fixed testcase Reverted wrong commit of example Updated pom to use default no kettle flow in pom Updated pom to remove with-kettle profile Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/30f575f4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/30f575f4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/30f575f4 Branch: refs/heads/master Commit: 30f575f4b009847e6eab9ff7d451419d1f9b0d46 Parents: 299f93b Author: ravipesala <ravi.pes...@gmail.com> Authored: Wed Jan 25 15:31:56 2017 +0530 Committer: jackylk <jacky.li...@huawei.com> Committed: Fri Jan 27 23:49:59 2017 +0800 ---------------------------------------------------------------------- .../carbondata/common/logging/LogService.java | 6 +++ .../examples/CarbonSessionExample.scala | 19 ++------- integration/spark-common-test/pom.xml | 2 +- integration/spark/pom.xml | 2 +- integration/spark2/pom.xml | 2 +- .../execution/CarbonLateDecodeStrategy.scala | 9 +++-- pom.xml | 8 +--- .../newflow/AbstractDataLoadProcessorStep.java | 42 +++++++++++++++++++- .../processing/newflow/row/CarbonRowBatch.java | 3 ++ .../sort/impl/ParallelReadMergeSorterImpl.java | 17 +++++--- ...arallelReadMergeSorterWithBucketingImpl.java | 19 +++++---- .../impl/UnsafeParallelReadMergeSorterImpl.java | 17 +++++--- .../sort/unsafe/UnsafeMemoryManager.java | 6 +-- .../steps/DataConverterProcessorStepImpl.java | 13 ++++-- ...ConverterProcessorWithBucketingStepImpl.java | 13 ++++-- .../steps/DataWriterProcessorStepImpl.java | 12 +++--- .../newflow/steps/DummyClassForTest.java | 4 ++ .../newflow/steps/InputProcessorStepImpl.java | 23 ++++++++--- .../newflow/steps/SortProcessorStepImpl.java | 16 +++++--- 19 files changed, 158 insertions(+), 75 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/common/src/main/java/org/apache/carbondata/common/logging/LogService.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/carbondata/common/logging/LogService.java b/common/src/main/java/org/apache/carbondata/common/logging/LogService.java index 4ba8f0a..ee02aba 100644 --- a/common/src/main/java/org/apache/carbondata/common/logging/LogService.java +++ b/common/src/main/java/org/apache/carbondata/common/logging/LogService.java @@ -42,4 +42,10 @@ public interface LogService { * @param message statistic message */ void statistic(String message); + + boolean isDebugEnabled(); + + boolean isWarnEnabled(); + + boolean isInfoEnabled(); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala ---------------------------------------------------------------------- diff --git a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala index 0d9c43f..1d485cd 100644 --- a/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala +++ b/examples/spark2/src/main/scala/org/apache/carbondata/examples/CarbonSessionExample.scala @@ -19,7 +19,6 @@ package org.apache.carbondata.examples import java.io.File -import org.apache.commons.io.FileUtils import org.apache.spark.sql.SparkSession import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -32,19 +31,10 @@ object CarbonSessionExample { + "../../../..").getCanonicalPath val storeLocation = s"$rootPath/examples/spark2/target/store" val warehouse = s"$rootPath/examples/spark2/target/warehouse" - val metastoredb = s"$rootPath/examples/spark2/target/metastore_db" - - // clean data folder - if (true) { - val clean = (path: String) => FileUtils.deleteDirectory(new File(path)) - clean(storeLocation) - clean(warehouse) - clean(metastoredb) - } + val metastoredb = s"$rootPath/examples/spark2/target" CarbonProperties.getInstance() .addProperty("carbon.kettle.home", s"$rootPath/processing/carbonplugins") - .addProperty("carbon.storelocation", storeLocation) .addProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, "yyyy/MM/dd") import org.apache.spark.sql.CarbonSession._ @@ -52,12 +42,9 @@ object CarbonSessionExample { val spark = SparkSession .builder() .master("local") - .appName("CarbonExample") - .enableHiveSupport() + .appName("CarbonSessionExample") .config("spark.sql.warehouse.dir", warehouse) - .config("javax.jdo.option.ConnectionURL", - s"jdbc:derby:;databaseName=$metastoredb;create=true") - .getOrCreateCarbonSession() + .getOrCreateCarbonSession(storeLocation, metastoredb) spark.sparkContext.setLogLevel("WARN") http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/integration/spark-common-test/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/pom.xml b/integration/spark-common-test/pom.xml index 03144ef..29f0cad 100644 --- a/integration/spark-common-test/pom.xml +++ b/integration/spark-common-test/pom.xml @@ -147,7 +147,7 @@ </environmentVariables> <systemProperties> <java.awt.headless>true</java.awt.headless> - <use.kettle>${use.kettle}</use.kettle> + <use_kettle>${use.kettle}</use_kettle> </systemProperties> </configuration> <executions> http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/integration/spark/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark/pom.xml b/integration/spark/pom.xml index ad921c0..7693bde 100644 --- a/integration/spark/pom.xml +++ b/integration/spark/pom.xml @@ -175,7 +175,7 @@ </environmentVariables> <systemProperties> <java.awt.headless>true</java.awt.headless> - <use.kettle>${use.kettle}</use.kettle> + <use_kettle>${use.kettle}</use_kettle> </systemProperties> </configuration> <executions> http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/integration/spark2/pom.xml ---------------------------------------------------------------------- diff --git a/integration/spark2/pom.xml b/integration/spark2/pom.xml index 2c125c5..ca8f33c 100644 --- a/integration/spark2/pom.xml +++ b/integration/spark2/pom.xml @@ -156,7 +156,7 @@ </environmentVariables> <systemProperties> <java.awt.headless>true</java.awt.headless> - <use.kettle>${use.kettle}</use.kettle> + <use_kettle>${use.kettle}</use_kettle> </systemProperties> </configuration> <executions> http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala index 3e0e9c0..549cc1e 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/CarbonLateDecodeStrategy.scala @@ -36,6 +36,7 @@ import org.apache.spark.sql.types.{AtomicType, IntegerType} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.metadata.schema.BucketingInfo import org.apache.carbondata.core.metadata.schema.table.CarbonTable +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.spark.CarbonAliasDecoderRelation import org.apache.carbondata.spark.rdd.CarbonScanRDD import org.apache.carbondata.spark.util.CarbonScalaUtil @@ -481,15 +482,17 @@ private[sql] class CarbonLateDecodeStrategy extends SparkStrategy { } def supportBatchedDataSource(sqlContext: SQLContext, cols: Seq[Attribute]): Boolean = { - val enableReader = { + val vectorizedReader = { if (sqlContext.sparkSession.conf.contains(CarbonCommonConstants.ENABLE_VECTOR_READER)) { sqlContext.sparkSession.conf.get(CarbonCommonConstants.ENABLE_VECTOR_READER) + } else if (System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) != null) { + System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER) } else { - System.getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, + CarbonProperties.getInstance().getProperty(CarbonCommonConstants.ENABLE_VECTOR_READER, CarbonCommonConstants.ENABLE_VECTOR_READER_DEFAULT) } } - sqlContext.conf.wholeStageEnabled && enableReader.toBoolean && + sqlContext.conf.wholeStageEnabled && vectorizedReader.toBoolean && cols.forall(_.dataType.isInstanceOf[AtomicType]) } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 2bfa539..bb50808 100644 --- a/pom.xml +++ b/pom.xml @@ -106,7 +106,7 @@ <snappy.version>1.1.2.6</snappy.version> <hadoop.version>2.2.0</hadoop.version> <kettle.version>4.4.0-stable</kettle.version> - <use.kettle>true</use.kettle> + <use.kettle>false</use.kettle> <hadoop.deps.scope>compile</hadoop.deps.scope> <spark.deps.scope>compile</spark.deps.scope> <scala.deps.scope>compile</scala.deps.scope> @@ -408,12 +408,6 @@ <id>include-all</id> </profile> <profile> - <id>no-kettle</id> - <properties> - <use.kettle>false</use.kettle> - </properties> - </profile> - <profile> <id>rat</id> <build> <plugins> http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java index 9961662..18d6aeb 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/AbstractDataLoadProcessorStep.java @@ -19,8 +19,11 @@ package org.apache.carbondata.processing.newflow; import java.io.IOException; import java.util.Iterator; +import java.util.concurrent.atomic.AtomicLong; import org.apache.carbondata.common.CarbonIterator; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; import org.apache.carbondata.processing.newflow.row.CarbonRow; import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; @@ -36,14 +39,40 @@ import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; */ public abstract class AbstractDataLoadProcessorStep { + private static final LogService LOGGER = + LogServiceFactory.getLogService(AbstractDataLoadProcessorStep.class.getName()); + protected CarbonDataLoadConfiguration configuration; protected AbstractDataLoadProcessorStep child; + protected AtomicLong rowCounter; + + protected boolean closed; + public AbstractDataLoadProcessorStep(CarbonDataLoadConfiguration configuration, AbstractDataLoadProcessorStep child) { this.configuration = configuration; this.child = child; + this.rowCounter = new AtomicLong(); + this.closed = false; + + if (LOGGER.isInfoEnabled()) { + // This thread prints the rows processed in each step for every 10 seconds. + new Thread() { + @Override public void run() { + while (!closed) { + try { + LOGGER.info("Rows processed in step " + getStepName() + " : " + rowCounter.get()); + Thread.sleep(10000); + } catch (InterruptedException e) { + //ignore + LOGGER.error(e.getMessage()); + } + } + } + }.start(); + } } /** @@ -115,14 +144,23 @@ public abstract class AbstractDataLoadProcessorStep { */ protected abstract CarbonRow processRow(CarbonRow row); + /** + * Get the step name for logging purpose. + * @return Step name + */ + protected abstract String getStepName(); + /** * Close all resources.This method is called after execute() is finished. * It will be called in both success and failure cases. */ public void close() { - if (child != null) { - child.close(); + if (!closed) { + closed = true; + if (child != null) { + child.close(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java index 1ed5138..941b51d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/row/CarbonRowBatch.java @@ -36,5 +36,8 @@ public class CarbonRowBatch { return rowBatch.iterator(); } + public int getSize() { + return rowBatch.size(); + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java index 59697a1..16c5122 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterImpl.java @@ -22,6 +22,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.logging.LogService; @@ -29,7 +30,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; -import org.apache.carbondata.processing.newflow.DataField; import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; import org.apache.carbondata.processing.newflow.row.CarbonRow; import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; @@ -60,7 +60,10 @@ public class ParallelReadMergeSorterImpl implements Sorter { private SingleThreadFinalSortFilesMerger finalMerger; - public ParallelReadMergeSorterImpl(DataField[] inputDataFields) { + private AtomicLong rowCounter; + + public ParallelReadMergeSorterImpl(AtomicLong rowCounter) { + this.rowCounter = rowCounter; } @Override @@ -98,7 +101,7 @@ public class ParallelReadMergeSorterImpl implements Sorter { try { for (int i = 0; i < iterators.length; i++) { executorService.submit( - new SortIteratorThread(iterators[i], sortDataRow, sortParameters, batchSize)); + new SortIteratorThread(iterators[i], sortDataRow, batchSize, rowCounter)); } executorService.shutdown(); executorService.awaitTermination(2, TimeUnit.DAYS); @@ -182,11 +185,14 @@ public class ParallelReadMergeSorterImpl implements Sorter { private Object[][] buffer; - public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows sortDataRows, - SortParameters parameters, int batchSize) { + private AtomicLong rowCounter; + + public SortIteratorThread(Iterator<CarbonRowBatch> iterator, + SortDataRows sortDataRows, int batchSize, AtomicLong rowCounter) { this.iterator = iterator; this.sortDataRows = sortDataRows; this.buffer = new Object[batchSize][]; + this.rowCounter = rowCounter; } @Override @@ -204,6 +210,7 @@ public class ParallelReadMergeSorterImpl implements Sorter { } if (i > 0) { sortDataRows.addRowBatch(buffer, i); + rowCounter.getAndAdd(i); } } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java index 3b43b46..245302f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java @@ -22,6 +22,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.logging.LogService; @@ -30,7 +31,6 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.metadata.schema.BucketingInfo; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; -import org.apache.carbondata.processing.newflow.DataField; import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; import org.apache.carbondata.processing.newflow.row.CarbonRow; import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; @@ -63,13 +63,13 @@ public class ParallelReadMergeSorterWithBucketingImpl implements Sorter { private BucketingInfo bucketingInfo; - private DataField[] inputDataFields; - private int sortBufferSize; - public ParallelReadMergeSorterWithBucketingImpl(DataField[] inputDataFields, + private AtomicLong rowCounter; + + public ParallelReadMergeSorterWithBucketingImpl(AtomicLong rowCounter, BucketingInfo bucketingInfo) { - this.inputDataFields = inputDataFields; + this.rowCounter = rowCounter; this.bucketingInfo = bucketingInfo; } @@ -103,7 +103,7 @@ public class ParallelReadMergeSorterWithBucketingImpl implements Sorter { final int batchSize = CarbonProperties.getInstance().getBatchSize(); try { for (int i = 0; i < iterators.length; i++) { - executorService.submit(new SortIteratorThread(iterators[i], sortDataRows)); + executorService.submit(new SortIteratorThread(iterators[i], sortDataRows, rowCounter)); } executorService.shutdown(); executorService.awaitTermination(2, TimeUnit.DAYS); @@ -196,9 +196,13 @@ public class ParallelReadMergeSorterWithBucketingImpl implements Sorter { private SortDataRows[] sortDataRows; - public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows[] sortDataRows) { + private AtomicLong rowCounter; + + public SortIteratorThread(Iterator<CarbonRowBatch> iterator, SortDataRows[] sortDataRows, + AtomicLong rowCounter) { this.iterator = iterator; this.sortDataRows = sortDataRows; + this.rowCounter = rowCounter; } @Override public Void call() throws CarbonDataLoadingException { @@ -213,6 +217,7 @@ public class ParallelReadMergeSorterWithBucketingImpl implements Sorter { SortDataRows sortDataRow = sortDataRows[row.bucketNumber]; synchronized (sortDataRow) { sortDataRow.addRow(row.getData()); + rowCounter.getAndAdd(1); } } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java index 4e4763f..d40b763 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/impl/UnsafeParallelReadMergeSorterImpl.java @@ -23,6 +23,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.common.logging.LogService; @@ -30,7 +31,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; -import org.apache.carbondata.processing.newflow.DataField; import org.apache.carbondata.processing.newflow.exception.CarbonDataLoadingException; import org.apache.carbondata.processing.newflow.row.CarbonRow; import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; @@ -62,7 +62,10 @@ public class UnsafeParallelReadMergeSorterImpl implements Sorter { private UnsafeSingleThreadFinalSortFilesMerger finalMerger; - public UnsafeParallelReadMergeSorterImpl(DataField[] inputDataFields) { + private AtomicLong rowCounter; + + public UnsafeParallelReadMergeSorterImpl(AtomicLong rowCounter) { + this.rowCounter = rowCounter; } @Override public void initialize(SortParameters sortParameters) { @@ -93,7 +96,7 @@ public class UnsafeParallelReadMergeSorterImpl implements Sorter { try { for (int i = 0; i < iterators.length; i++) { executorService - .submit(new SortIteratorThread(iterators[i], sortDataRow, sortParameters, batchSize)); + .submit(new SortIteratorThread(iterators[i], sortDataRow, batchSize, rowCounter)); } executorService.shutdown(); executorService.awaitTermination(2, TimeUnit.DAYS); @@ -177,11 +180,14 @@ public class UnsafeParallelReadMergeSorterImpl implements Sorter { private Object[][] buffer; - public SortIteratorThread(Iterator<CarbonRowBatch> iterator, UnsafeSortDataRows sortDataRows, - SortParameters parameters, int batchSize) { + private AtomicLong rowCounter; + + public SortIteratorThread(Iterator<CarbonRowBatch> iterator, + UnsafeSortDataRows sortDataRows, int batchSize, AtomicLong rowCounter) { this.iterator = iterator; this.sortDataRows = sortDataRows; this.buffer = new Object[batchSize][]; + this.rowCounter = rowCounter; } @Override public Void call() throws CarbonDataLoadingException { @@ -198,6 +204,7 @@ public class UnsafeParallelReadMergeSorterImpl implements Sorter { } if (i > 0) { sortDataRows.addRowBatch(buffer, i); + rowCounter.getAndAdd(i); } } } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java index c7528ab..7e7bb7e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeMemoryManager.java @@ -68,14 +68,14 @@ public class UnsafeMemoryManager { this.totalMemory = totalMemory; this.allocator = allocator; minimumMemory = (long) (totalMemory * ((double) 10 / 100)); - LOGGER.audit("Memory manager is created with size " + totalMemory + " with " + allocator + LOGGER.info("Memory manager is created with size " + totalMemory + " with " + allocator + " and minimum reserve memory " + minimumMemory); } public synchronized MemoryBlock allocateMemory(long memoryRequested) { if (memoryUsed + memoryRequested <= totalMemory) { MemoryBlock allocate = allocator.allocate(memoryRequested); memoryUsed += allocate.size(); - LOGGER.audit("Memory block is created with size " + allocate.size() + + LOGGER.info("Memory block is created with size " + allocate.size() + " Total memory used " + memoryUsed + " memory left " + (getAvailableMemory())); return allocate; } @@ -86,7 +86,7 @@ public class UnsafeMemoryManager { allocator.free(memoryBlock); memoryUsed -= memoryBlock.size(); memoryUsed = memoryUsed < 0 ? 0 : memoryUsed; - LOGGER.audit( + LOGGER.info( "Memory released, memory used " + memoryUsed + " memory left " + (getAvailableMemory())); } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java index 45105c6..1a6535f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java @@ -94,6 +94,7 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte while (batchIterator.hasNext()) { newBatch.addRow(localConverter.convert(batchIterator.next())); } + rowCounter.getAndAdd(newBatch.getSize()); return newBatch; } @@ -152,9 +153,15 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte @Override public void close() { - super.close(); - if (converter != null) { - converter.finish(); + if (!closed) { + super.close(); + if (converter != null) { + converter.finish(); + } } } + + @Override protected String getStepName() { + return "Data Converter"; + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java index 5f06730..0223b04 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorWithBucketingStepImpl.java @@ -121,6 +121,7 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa convertRow.bucketNumber = (short) partitioner.getPartition(next.getData()); newBatch.addRow(convertRow); } + rowCounter.getAndAdd(newBatch.getSize()); return newBatch; } @@ -179,9 +180,15 @@ public class DataConverterProcessorWithBucketingStepImpl extends AbstractDataLoa @Override public void close() { - super.close(); - if (converter != null) { - converter.finish(); + if (!closed) { + super.close(); + if (converter != null) { + converter.finish(); + } } } + + @Override protected String getStepName() { + return "Data Converter with Bucketing"; + } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java index b9544d0..710cc4f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataWriterProcessorStepImpl.java @@ -60,8 +60,6 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { private long readCounter; - private long writeCounter; - private int measureIndex = IgnoreDictionary.MEASURES_INDEX_IN_ROW.getIndex(); private int noDimByteArrayIndex = IgnoreDictionary.BYTE_ARRAY_INDEX_IN_ROW.getIndex(); @@ -141,8 +139,8 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { return null; } - @Override public void close() { - + @Override protected String getStepName() { + return "Data Writer"; } private void finish(String tableName, CarbonFactHandler dataHandler) { @@ -154,9 +152,9 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { LOGGER.info("Record Processed For table: " + tableName); String logMessage = "Finished Carbon DataWriterProcessorStepImpl: Read: " + readCounter + ": Write: " - + writeCounter; + + rowCounter.get(); LOGGER.info(logMessage); - CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(writeCounter); + CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get()); processingComplete(dataHandler); CarbonTimeStatisticsFactory.getLoadStatisticsInstance() .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(), @@ -210,11 +208,11 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { outputRow[outputRow.length - 1] = keyGenerator.generateKey(highCardExcludedRows); dataHandler.addDataToStore(outputRow); - writeCounter++; } } catch (Exception e) { throw new CarbonDataLoadingException("unable to generate the mdkey", e); } + rowCounter.getAndAdd(batch.getSize()); } @Override protected CarbonRow processRow(CarbonRow row) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java index 8130cf7..a7d8e7f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DummyClassForTest.java @@ -49,6 +49,10 @@ public class DummyClassForTest extends AbstractDataLoadProcessorStep { } + @Override protected String getStepName() { + return "Dummy"; + } + @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException { Iterator<CarbonRowBatch>[] iterators = child.execute(); this.executorService = Executors.newFixedThreadPool(iterators.length); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java index c0bf50a..0097690 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/InputProcessorStepImpl.java @@ -24,6 +24,7 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; import org.apache.carbondata.common.CarbonIterator; import org.apache.carbondata.core.util.CarbonProperties; @@ -72,7 +73,7 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep { for (int i = 0; i < outIterators.length; i++) { outIterators[i] = new InputProcessorIterator(readerIterators[i], rowParser, batchSize, - configuration.isPreFetch(), executorService); + configuration.isPreFetch(), executorService, rowCounter); } return outIterators; } @@ -104,12 +105,19 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep { } @Override public void close() { - executorService.shutdown(); - for (CarbonIterator inputIterator : inputIterators) { - inputIterator.close(); + if (!closed) { + super.close(); + executorService.shutdown(); + for (CarbonIterator inputIterator : inputIterators) { + inputIterator.close(); + } } } + @Override protected String getStepName() { + return "Input Processor"; + } + /** * This iterator wraps the list of iterators and it starts iterating the each * iterator of the list one by one. It also parse the data while iterating it. @@ -136,8 +144,11 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep { private boolean preFetch; + private AtomicLong rowCounter; + public InputProcessorIterator(List<CarbonIterator<Object[]>> inputIterators, - RowParser rowParser, int batchSize, boolean preFetch, ExecutorService executorService) { + RowParser rowParser, int batchSize, boolean preFetch, ExecutorService executorService, + AtomicLong rowCounter) { this.inputIterators = inputIterators; this.batchSize = batchSize; this.rowParser = rowParser; @@ -145,6 +156,7 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep { // Get the first iterator from the list. currentIterator = inputIterators.get(counter++); this.executorService = executorService; + this.rowCounter = rowCounter; this.preFetch = preFetch; this.nextBatch = false; this.firstTime = true; @@ -222,6 +234,7 @@ public class InputProcessorStepImpl extends AbstractDataLoadProcessorStep { carbonRowBatch.addRow(new CarbonRow(rowParser.parseRow(currentIterator.next()))); count++; } + rowCounter.getAndAdd(carbonRowBatch.getSize()); return carbonRowBatch; } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/30f575f4/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java index dc048b9..bd4b0e6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/SortProcessorStepImpl.java @@ -59,15 +59,13 @@ public class SortProcessorStepImpl extends AbstractDataLoadProcessorStep { .getProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, CarbonCommonConstants.ENABLE_UNSAFE_SORT_DEFAULT)); if (offheapsort) { - sorter = new UnsafeParallelReadMergeSorterImpl(child.getOutput()); + sorter = new UnsafeParallelReadMergeSorterImpl(rowCounter); } else { - sorter = new ParallelReadMergeSorterImpl(child.getOutput()); + sorter = new ParallelReadMergeSorterImpl(rowCounter); } if (configuration.getBucketingInfo() != null) { - sorter = new ParallelReadMergeSorterWithBucketingImpl(child.getOutput(), + sorter = new ParallelReadMergeSorterWithBucketingImpl(rowCounter, configuration.getBucketingInfo()); - } else { - sorter = new ParallelReadMergeSorterImpl(child.getOutput()); } sorter.initialize(sortParameters); } @@ -87,7 +85,13 @@ public class SortProcessorStepImpl extends AbstractDataLoadProcessorStep { @Override public void close() { - sorter.close(); + if (!closed) { + super.close(); + sorter.close(); + } } + @Override protected String getStepName() { + return "Sort Processor"; + } }