Fixed issue of more records after update.
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/650263c4 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/650263c4 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/650263c4 Branch: refs/heads/branch-1.1 Commit: 650263c43336e698d35ab6a46db7cb38b9bfddaf Parents: adea5a4 Author: ravipesala <ravi.pes...@gmail.com> Authored: Wed Jun 21 14:14:44 2017 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Sat Jun 24 10:19:41 2017 +0530 ---------------------------------------------------------------------- .../DictionaryBasedVectorResultCollector.java | 6 +++-- .../core/scan/result/AbstractScannedResult.java | 5 +++- .../scan/scanner/AbstractBlockletScanner.java | 19 ++++++++++++++- .../iud/UpdateCarbonTableTestCase.scala | 25 +++++++++++++++++++- 4 files changed, 50 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/650263c4/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java index 3203934..73ccb5d 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/collector/impl/DictionaryBasedVectorResultCollector.java @@ -130,6 +130,7 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC @Override public void collectVectorBatch(AbstractScannedResult scannedResult, CarbonColumnarBatch columnarBatch) { int numberOfPages = scannedResult.numberOfpages(); + int filteredRows = 0; while (scannedResult.getCurrentPageCounter() < numberOfPages) { int currentPageRowCount = scannedResult.getCurrentPageRowCount(); if (currentPageRowCount == 0) { @@ -138,13 +139,14 @@ public class DictionaryBasedVectorResultCollector extends AbstractScannedResultC } int rowCounter = scannedResult.getRowCounter(); int availableRows = currentPageRowCount - rowCounter; - int requiredRows = columnarBatch.getBatchSize() - columnarBatch.getActualSize(); + int requiredRows = + columnarBatch.getBatchSize() - (columnarBatch.getActualSize() + filteredRows); requiredRows = Math.min(requiredRows, availableRows); if (requiredRows < 1) { return; } fillColumnVectorDetails(columnarBatch, rowCounter, requiredRows); - int filteredRows = scannedResult + filteredRows = scannedResult .markFilteredRows(columnarBatch, rowCounter, requiredRows, columnarBatch.getRowCounter()); scanAndFillResult(scannedResult, columnarBatch, rowCounter, availableRows, requiredRows); columnarBatch.setActualSize(columnarBatch.getActualSize() + requiredRows - filteredRows); http://git-wip-us.apache.org/repos/asf/carbondata/blob/650263c4/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java index 4e7fd1f..ac3d2b4 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/result/AbstractScannedResult.java @@ -324,6 +324,9 @@ public abstract class AbstractScannedResult { rowCounter = 0; currentRow = -1; pageCounter++; + if (null != deletedRecordMap) { + currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + "_" + pageCounter); + } } public int numberOfpages() { @@ -479,7 +482,7 @@ public abstract class AbstractScannedResult { rowCounter = 0; currentRow = -1; if (null != deletedRecordMap) { - currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + pageCounter + ""); + currentDeleteDeltaVo = deletedRecordMap.get(blockletNumber + "_" + pageCounter); } return hasNext(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/650263c4/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java index f3d1336..022e351 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/scanner/AbstractBlockletScanner.java @@ -19,6 +19,7 @@ package org.apache.carbondata.core.scan.scanner; import java.io.IOException; import org.apache.carbondata.core.constants.CarbonCommonConstants; +import org.apache.carbondata.core.constants.CarbonV3DataFormatConstants; import org.apache.carbondata.core.datastore.chunk.DimensionColumnDataChunk; import org.apache.carbondata.core.datastore.chunk.MeasureColumnDataChunk; import org.apache.carbondata.core.datastore.chunk.impl.DimensionRawColumnChunk; @@ -31,6 +32,7 @@ import org.apache.carbondata.core.scan.result.impl.NonFilterQueryScannedResult; import org.apache.carbondata.core.stats.QueryStatistic; import org.apache.carbondata.core.stats.QueryStatisticsConstants; import org.apache.carbondata.core.stats.QueryStatisticsModel; +import org.apache.carbondata.core.util.CarbonProperties; /** * Blocklet scanner class to process the block @@ -46,6 +48,10 @@ public abstract class AbstractBlockletScanner implements BlockletScanner { private AbstractScannedResult emptyResult; + private static int NUMBER_OF_ROWS_PER_PAGE = Integer.parseInt(CarbonProperties.getInstance() + .getProperty(CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE, + CarbonV3DataFormatConstants.NUMBER_OF_ROWS_PER_BLOCKLET_COLUMN_PAGE_DEFAULT)); + public AbstractBlockletScanner(BlockExecutionInfo tableBlockExecutionInfos) { this.blockExecutionInfo = tableBlockExecutionInfos; } @@ -95,7 +101,7 @@ public abstract class AbstractBlockletScanner implements BlockletScanner { } } scannedResult.setMeasureChunks(measureColumnDataChunks); - int[] numberOfRows = new int[] { blocksChunkHolder.getDataBlock().nodeSize() }; + int[] numberOfRows = null; if (blockExecutionInfo.getAllSelectedDimensionBlocksIndexes().length > 0) { for (int i = 0; i < dimensionRawColumnChunks.length; i++) { if (dimensionRawColumnChunks[i] != null) { @@ -111,6 +117,17 @@ public abstract class AbstractBlockletScanner implements BlockletScanner { } } } + // count(*) case there would not be any dimensions are measures selected. + if (numberOfRows == null) { + numberOfRows = new int[blocksChunkHolder.getDataBlock().numberOfPages()]; + for (int i = 0; i < numberOfRows.length; i++) { + numberOfRows[i] = NUMBER_OF_ROWS_PER_PAGE; + } + int lastPageSize = blocksChunkHolder.getDataBlock().nodeSize() % NUMBER_OF_ROWS_PER_PAGE; + if (lastPageSize > 0) { + numberOfRows[numberOfRows.length - 1] = lastPageSize; + } + } scannedResult.setNumberOfRows(numberOfRows); scannedResult.setRawColumnChunks(dimensionRawColumnChunks); // adding statistics for carbon scan time http://git-wip-us.apache.org/repos/asf/carbondata/blob/650263c4/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala index 7917b61..79fda30 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/UpdateCarbonTableTestCase.scala @@ -16,7 +16,7 @@ */ package org.apache.carbondata.spark.testsuite.iud -import org.apache.spark.sql.Row +import org.apache.spark.sql.{Row, SaveMode} import org.apache.spark.sql.common.util.QueryTest import org.scalatest.BeforeAndAfterAll @@ -386,6 +386,29 @@ class UpdateCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { } } + test("More records after update operation ") { + sql("DROP TABLE IF EXISTS default.carbon1") + import sqlContext.implicits._ + val df = sqlContext.sparkContext.parallelize(1 to 36000) + .map(x => (x+"a", "b", x)) + .toDF("c1", "c2", "c3") + df.write + .format("carbondata") + .option("tableName", "carbon1") + .option("tempCSV", "true") + .option("compress", "true") + .mode(SaveMode.Overwrite) + .save() + + checkAnswer(sql("select count(*) from default.carbon1"), Seq(Row(36000))) + + sql("update default.carbon1 set (c1)=('test123') where c1='9999a'").show() + + checkAnswer(sql("select count(*) from default.carbon1"), Seq(Row(36000))) + + sql("DROP TABLE IF EXISTS default.carbon1") + } + override def afterAll { sql("use default") sql("drop database if exists iud cascade")