[REBASE] Solve conflict after merging master
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/3759a244 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/3759a244 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/3759a244 Branch: refs/heads/carbonstore-rebase5 Commit: 3759a24431168ebf290ecbd5ffcac6a435f8c508 Parents: 58c1753 Author: Jacky Li <jacky.li...@qq.com> Authored: Tue Feb 27 11:26:30 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Wed Feb 28 22:13:13 2018 +0800 ---------------------------------------------------------------------- .../carbondata/core/datamap/dev/DataMap.java | 9 +- .../core/datamap/dev/DataMapFactory.java | 2 +- .../exception/ConcurrentOperationException.java | 16 +- .../core/indexstore/BlockletDetailsFetcher.java | 3 +- .../blockletindex/BlockletDataMap.java | 3 +- .../blockletindex/SegmentIndexFileStore.java | 2 - .../core/metadata/PartitionMapFileStore.java | 0 .../scan/executor/util/RestructureUtil.java | 6 +- .../statusmanager/SegmentStatusManager.java | 10 +- .../SegmentUpdateStatusManager.java | 7 +- .../hadoop/api/CarbonTableInputFormat.java | 5 +- .../preaggregate/TestPreAggCreateCommand.scala | 2 +- .../TestInsertAndOtherCommandConcurrent.scala | 2 +- .../StandardPartitionGlobalSortTestCase.scala | 2 +- .../exception/ProcessMetaDataException.java | 2 + .../org/apache/carbondata/api/CarbonStore.scala | 6 +- .../carbondata/spark/load/CsvRDDHelper.scala | 157 +++++++++++++++++++ .../load/DataLoadProcessBuilderOnSpark.scala | 3 +- .../carbondata/spark/util/CarbonScalaUtil.scala | 2 +- .../carbondata/spark/util/CommonUtil.scala | 2 - .../command/carbonTableSchemaCommon.scala | 6 +- .../CarbonAlterTableCompactionCommand.scala | 3 +- .../management/CarbonCleanFilesCommand.scala | 2 +- .../CarbonDeleteLoadByIdCommand.scala | 2 +- .../CarbonDeleteLoadByLoadDateCommand.scala | 2 +- .../management/CarbonLoadDataCommand.scala | 28 ++-- .../CarbonProjectForDeleteCommand.scala | 2 +- .../CarbonProjectForUpdateCommand.scala | 2 +- .../schema/CarbonAlterTableRenameCommand.scala | 2 +- .../command/table/CarbonDropTableCommand.scala | 2 +- .../datasources/CarbonFileFormat.scala | 3 - .../TestStreamingTableWithRowParser.scala | 9 +- .../vectorreader/AddColumnTestCases.scala | 1 + .../datamap/DataMapWriterListener.java | 3 +- .../loading/model/CarbonLoadModelBuilder.java | 34 +++- .../processing/loading/model/LoadOption.java | 15 +- .../processing/merger/CarbonDataMergerUtil.java | 19 ++- .../merger/CompactionResultSortProcessor.java | 4 +- .../merger/RowResultMergerProcessor.java | 4 +- .../partition/spliter/RowResultProcessor.java | 4 +- .../util/CarbonDataProcessorUtil.java | 3 +- store/sdk/pom.xml | 2 +- .../carbondata/sdk/file/CSVCarbonWriter.java | 8 +- 43 files changed, 296 insertions(+), 105 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java index 4a68286..fdeacff 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java @@ -38,18 +38,13 @@ public interface DataMap<T extends Blocklet> { /** * Prune the datamap with filter expression and partition information. It returns the list of * blocklets where these filters can exist. - * - * @param filterExp - * @return */ - List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, List<PartitionSpec> partitions); + List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, + List<PartitionSpec> partitions); // TODO Move this method to Abstract class /** * Validate whether the current segment needs to be fetching the required data - * - * @param filterExp - * @return */ boolean isScanRequired(FilterResolverIntf filterExp); http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java index 50ac279..d8a467f 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java @@ -21,8 +21,8 @@ import java.util.List; import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.DataMapMeta; -import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.DataMapType; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.events.Event; http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java b/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java index 7e717ba..918268c 100644 --- a/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java +++ b/core/src/main/java/org/apache/carbondata/core/exception/ConcurrentOperationException.java @@ -17,21 +17,10 @@ package org.apache.carbondata.core.exception; -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.common.annotations.InterfaceStability; - +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; -/** - * This exception will be thrown when executing concurrent operations which - * is not supported in carbon. - * - * For example, when INSERT OVERWRITE is executing, other operations are not - * allowed, so this exception will be thrown - */ -@InterfaceAudience.User -@InterfaceStability.Stable -public class ConcurrentOperationException extends Exception { +public class ConcurrentOperationException extends MalformedCarbonCommandException { public ConcurrentOperationException(String dbName, String tableName, String command1, String command2) { @@ -48,3 +37,4 @@ public class ConcurrentOperationException extends Exception { } } + http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java index dd592c0..58c11db 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java @@ -53,5 +53,6 @@ public interface BlockletDetailsFetcher { * @param segment * @return */ - List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions) throws IOException; + List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions) + throws IOException; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java index ce6193b..b379ae3 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -660,7 +660,8 @@ public class BlockletDataMap extends AbstractCoarseGrainDataMap implements Cache } @Override - public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, List<PartitionSpec> partitions) { + public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, + List<PartitionSpec> partitions) { if (unsafeMemoryDMStore.getRowCount() == 0) { return new ArrayList<>(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java index 537e124..00d03a5 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java @@ -21,10 +21,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java index 2712cbc..e67d822 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/executor/util/RestructureUtil.java @@ -80,7 +80,7 @@ public class RestructureUtil { if (queryDimension.getDimension().hasEncoding(Encoding.IMPLICIT)) { presentDimension.add(queryDimension); isDimensionExists[dimIndex] = true; - dimensionInfo.dataType[queryDimension.getQueryOrder()] = + dimensionInfo.dataType[queryDimension.getOrdinal()] = queryDimension.getDimension().getDataType(); } else { for (CarbonDimension tableDimension : tableBlockDimensions) { @@ -95,7 +95,7 @@ public class RestructureUtil { currentBlockDimension.setOrdinal(queryDimension.getOrdinal()); presentDimension.add(currentBlockDimension); isDimensionExists[dimIndex] = true; - dimensionInfo.dataType[currentBlockDimension.getQueryOrder()] = + dimensionInfo.dataType[currentBlockDimension.getOrdinal()] = currentBlockDimension.getDimension().getDataType(); break; } @@ -113,7 +113,7 @@ public class RestructureUtil { currentBlockDimension.setOrdinal(queryDimension.getOrdinal()); presentDimension.add(currentBlockDimension); isDimensionExists[dimIndex] = true; - dimensionInfo.dataType[currentBlockDimension.getQueryOrder()] = + dimensionInfo.dataType[currentBlockDimension.getOrdinal()] = currentBlockDimension.getDimension().getDataType(); break; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java index 89666ab..1bb4b03 100755 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -39,6 +39,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.fileoperations.AtomicFileOperations; import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.locks.CarbonLockFactory; import org.apache.carbondata.core.locks.CarbonLockUtil; import org.apache.carbondata.core.locks.ICarbonLock; @@ -838,6 +839,13 @@ public class SegmentStatusManager { public static void deleteLoadsAndUpdateMetadata( CarbonTable carbonTable, boolean isForceDeletion) throws IOException { + deleteLoadsAndUpdateMetadata(carbonTable, isForceDeletion, null); + } + + public static void deleteLoadsAndUpdateMetadata( + CarbonTable carbonTable, + boolean isForceDeletion, + List<PartitionSpec> partitionSpecs) throws IOException { if (isLoadDeletionRequired(carbonTable.getMetadataPath())) { LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); @@ -882,7 +890,7 @@ public class SegmentStatusManager { CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK); if (updationCompletionStatus) { DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion( - identifier, carbonTable.getMetadataPath(), isForceDeletion); + identifier, carbonTable.getMetadataPath(), isForceDeletion, partitionSpecs); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java index 6ec6fa2..4a2149e 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java @@ -77,8 +77,8 @@ public class SegmentUpdateStatusManager { this.identifier = identifier; // current it is used only for read function scenarios, as file update always requires to work // on latest file status. - segmentDetails = - SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(identifier.getTablePath())); + segmentDetails = SegmentStatusManager.readLoadMetadata( + CarbonTablePath.getMetadataPath(identifier.getTablePath())); if (segmentDetails.length > 0) { isPartitionTable = segmentDetails[0].getSegmentFile() != null; } @@ -259,7 +259,8 @@ public class SegmentUpdateStatusManager { + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.PART_ID) .replace("#", "/") + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName; } else { - String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment); + String carbonDataDirectoryPath = + CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment); blockPath = carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index b485b69..5184e07 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -33,8 +33,8 @@ import java.util.Map; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.DataMapStoreManager; -import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.DataMapType; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.TableDataMap; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -494,7 +494,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); for (Segment segment : streamSegments) { - String segmentDir = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()); + String segmentDir = CarbonTablePath.getSegmentPath( + identifier.getTablePath(), segment.getSegmentNo()); FileFactory.FileType fileType = FileFactory.getFileType(segmentDir); if (FileFactory.isFileExist(segmentDir, fileType)) { String indexName = CarbonTablePath.getCarbonStreamIndexFileName(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala index 8f63af6..e09b922 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll -import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException} import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala index b39c44c..3f0ca42 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala @@ -34,11 +34,11 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainData import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap} import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} import org.apache.carbondata.core.datastore.page.ColumnPage +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.scan.filter.intf.ExpressionType import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events.Event -import org.apache.carbondata.spark.exception.ConcurrentOperationException import org.apache.carbondata.spark.testsuite.datamap.C2DataMapFactory // This testsuite test insert and insert overwrite with other commands concurrently http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala index 7d0959c..629417f 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala @@ -24,9 +24,9 @@ import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.common.constants.LoggerAction +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterAll { var executorService: ExecutorService = _ http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java index 3e06bde..471b645 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java @@ -17,6 +17,8 @@ package org.apache.carbondata.spark.exception; +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException; + // This exception will be thrown when processMetaData failed in // Carbon's RunnableCommand public class ProcessMetaDataException extends MalformedCarbonCommandException { http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala index b69ec37..bfb1616 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala @@ -36,7 +36,6 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFile import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.spark.util.DataLoadingUtil object CarbonStore { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) @@ -139,9 +138,8 @@ object CarbonStore { carbonCleanFilesLock = CarbonLockUtil .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg) - SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true) - DataLoadingUtil.deleteLoadsAndUpdateMetadata( - isForceDeletion = true, carbonTable, currentTablePartitions.map(_.asJava).orNull) + SegmentStatusManager.deleteLoadsAndUpdateMetadata( + carbonTable, true, currentTablePartitions.map(_.asJava).orNull) CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true) currentTablePartitions match { case Some(partitions) => http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala new file mode 100644 index 0000000..36d8c51 --- /dev/null +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/CsvRDDHelper.scala @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.carbondata.spark.load + +import java.text.SimpleDateFormat +import java.util.{Date, Locale} + +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.Path +import org.apache.hadoop.mapred.JobConf +import org.apache.hadoop.mapreduce.{TaskAttemptID, TaskType} +import org.apache.hadoop.mapreduce.lib.input.{FileInputFormat, FileSplit} +import org.apache.hadoop.mapreduce.task.{JobContextImpl, TaskAttemptContextImpl} +import org.apache.spark.deploy.SparkHadoopUtil +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.GenericInternalRow +import org.apache.spark.sql.execution.datasources.{FilePartition, FileScanRDD, PartitionedFile} +import org.apache.spark.sql.util.SparkSQLUtil.sessionState + +import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} +import org.apache.carbondata.processing.loading.csvinput.CSVInputFormat +import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.spark.rdd.SerializableConfiguration +import org.apache.carbondata.spark.util.CommonUtil + +object CsvRDDHelper { + + private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) + + /** + * createsw a RDD that does reading of multiple CSV files + */ + def csvFileScanRDD( + spark: SparkSession, + model: CarbonLoadModel, + hadoopConf: Configuration + ): RDD[InternalRow] = { + // 1. partition + val defaultMaxSplitBytes = sessionState(spark).conf.filesMaxPartitionBytes + val openCostInBytes = sessionState(spark).conf.filesOpenCostInBytes + val defaultParallelism = spark.sparkContext.defaultParallelism + CommonUtil.configureCSVInputFormat(hadoopConf, model) + hadoopConf.set(FileInputFormat.INPUT_DIR, model.getFactFilePath) + val jobConf = new JobConf(hadoopConf) + SparkHadoopUtil.get.addCredentials(jobConf) + val jobContext = new JobContextImpl(jobConf, null) + val inputFormat = new CSVInputFormat() + val rawSplits = inputFormat.getSplits(jobContext).toArray + val splitFiles = rawSplits.map { split => + val fileSplit = split.asInstanceOf[FileSplit] + PartitionedFile( + InternalRow.empty, + fileSplit.getPath.toString, + fileSplit.getStart, + fileSplit.getLength, + fileSplit.getLocations) + }.sortBy(_.length)(implicitly[Ordering[Long]].reverse) + val totalBytes = splitFiles.map(_.length + openCostInBytes).sum + val bytesPerCore = totalBytes / defaultParallelism + + val maxSplitBytes = Math.min(defaultMaxSplitBytes, Math.max(openCostInBytes, bytesPerCore)) + LOGGER.info(s"Planning scan with bin packing, max size: $maxSplitBytes bytes, " + + s"open cost is considered as scanning $openCostInBytes bytes.") + + val partitions = new ArrayBuffer[FilePartition] + val currentFiles = new ArrayBuffer[PartitionedFile] + var currentSize = 0L + + def closePartition(): Unit = { + if (currentFiles.nonEmpty) { + val newPartition = + FilePartition( + partitions.size, + currentFiles.toArray.toSeq) + partitions += newPartition + } + currentFiles.clear() + currentSize = 0 + } + + splitFiles.foreach { file => + if (currentSize + file.length > maxSplitBytes) { + closePartition() + } + // Add the given file to the current partition. + currentSize += file.length + openCostInBytes + currentFiles += file + } + closePartition() + + // 2. read function + val serializableConfiguration = new SerializableConfiguration(jobConf) + val readFunction = new (PartitionedFile => Iterator[InternalRow]) with Serializable { + override def apply(file: PartitionedFile): Iterator[InternalRow] = { + new Iterator[InternalRow] { + val hadoopConf = serializableConfiguration.value + val jobTrackerId: String = { + val formatter = new SimpleDateFormat("yyyyMMddHHmmss", Locale.US) + formatter.format(new Date()) + } + val attemptId = new TaskAttemptID(jobTrackerId, 0, TaskType.MAP, 0, 0) + val hadoopAttemptContext = new TaskAttemptContextImpl(hadoopConf, attemptId) + val inputSplit = + new FileSplit(new Path(file.filePath), file.start, file.length, file.locations) + var finished = false + val inputFormat = new CSVInputFormat() + val reader = inputFormat.createRecordReader(inputSplit, hadoopAttemptContext) + reader.initialize(inputSplit, hadoopAttemptContext) + + override def hasNext: Boolean = { + if (!finished) { + if (reader != null) { + if (reader.nextKeyValue()) { + true + } else { + finished = true + reader.close() + false + } + } else { + finished = true + false + } + } else { + false + } + } + + override def next(): InternalRow = { + new GenericInternalRow(reader.getCurrentValue.get().asInstanceOf[Array[Any]]) + } + } + } + } + new FileScanRDD(spark, readFunction, partitions) + } + +} http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index e1bd84b..1062cd7 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -34,7 +34,6 @@ import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, Failure import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters} import org.apache.carbondata.processing.util.CarbonDataProcessorUtil -import org.apache.carbondata.spark.util.DataLoadingUtil /** * Use sortBy operator in spark to load the data @@ -52,7 +51,7 @@ object DataLoadProcessBuilderOnSpark { } else { // input data from files val columnCount = model.getCsvHeaderColumns.length - DataLoadingUtil.csvFileScanRDD(sparkSession, model, hadoopConf) + CsvRDDHelper.csvFileScanRDD(sparkSession, model, hadoopConf) .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount)) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala index 33263d6..298c84e 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.command.{DataTypeInfo, UpdateTableModel} import org.apache.spark.sql.types._ +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogService import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType} import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier} @@ -45,7 +46,6 @@ import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.FailureCauses import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException import org.apache.carbondata.processing.util.CarbonDataProcessorUtil -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException object CarbonScalaUtil { def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index 9104a32..d3093fb 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -816,8 +816,6 @@ object CommonUtil { val carbonTable = CarbonMetadata.getInstance .getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName) SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true) - DataLoadingUtil.deleteLoadsAndUpdateMetadata( - isForceDeletion = true, carbonTable, null) } catch { case _: Exception => LOGGER.warn(s"Error while cleaning table " + http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index 71ce2c6..3c21af3 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -31,19 +31,15 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.indexstore.PartitionSpec -import org.apache.carbondata.core.metadata.CarbonTableIdentifier -import org.apache.carbondata.core.metadata.SegmentFileStore.SegmentFile -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType} import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema._ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier, TableInfo, TableSchema} import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, ParentColumnTableRelation} -import org.apache.carbondata.core.service.CarbonCommonFactory import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager} import org.apache.carbondata.core.util.DataTypeUtil -import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.processing.loading.FailureCauses import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger.CompactionType http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index c59209f..7bcf3bc 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -21,7 +21,7 @@ import java.io.{File, IOException} import scala.collection.JavaConverters._ -import org.apache.spark.sql.{CarbonEnv, Row, SQLContext, SparkSession} +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel} @@ -47,7 +47,6 @@ import org.apache.carbondata.processing.loading.events.LoadEvents.LoadMetadataEv import org.apache.carbondata.processing.loading.model.{CarbonDataLoadSchema, CarbonLoadModel} import org.apache.carbondata.processing.merger.{CarbonDataMergerUtil, CompactionType} import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory -import org.apache.carbondata.spark.util.CommonUtil import org.apache.carbondata.streaming.StreamHandoffRDD import org.apache.carbondata.streaming.segment.StreamSegment http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala index d2adc57..2092028 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala @@ -26,10 +26,10 @@ import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.carbondata.api.CarbonStore import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus} -import org.apache.carbondata.spark.exception.ConcurrentOperationException import org.apache.carbondata.spark.util.CommonUtil /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala index 0861c63..81427a1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala @@ -21,9 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.execution.command.{Checker, DataCommand} import org.apache.carbondata.api.CarbonStore +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent, DeleteSegmentByIdPreEvent, OperationContext, OperationListenerBus} -import org.apache.carbondata.spark.exception.ConcurrentOperationException case class CarbonDeleteLoadByIdCommand( loadIds: Seq[String], http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala index dcbc6ce..1d76bda 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala @@ -21,9 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.execution.command.{Checker, DataCommand} import org.apache.carbondata.api.CarbonStore +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.events.{DeleteSegmentByDatePostEvent, DeleteSegmentByDatePreEvent, OperationContext, OperationListenerBus} -import org.apache.carbondata.spark.exception.ConcurrentOperationException case class CarbonDeleteLoadByLoadDateCommand( databaseNameOp: Option[String], http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 4806a6f..34a464a 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -59,10 +59,9 @@ import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} -import org.apache.carbondata.core.statusmanager.SegmentStatus -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil} import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.DataTypeUtil import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.events.exception.PreEventException @@ -71,18 +70,15 @@ import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} import org.apache.carbondata.processing.loading.exception.NoRetryException -import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption} -import org.apache.carbondata.processing.util.CarbonLoaderUtil +import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, LoadOption} import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.loading.sort.SortScopeOptions import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer -import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionRDD} -import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil} -import org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark +import org.apache.carbondata.spark.load.{CsvRDDHelper, DataLoadProcessorStepOnSpark} import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory -import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl} +import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl} case class CarbonLoadDataCommand( databaseNameOp: Option[String], @@ -193,12 +189,18 @@ case class CarbonLoadDataCommand( carbonLoadModel.setAggLoadRequest( internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean) carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", "")) + + val javaPartition = mutable.Map[String, String]() + partition.foreach { case (k, v) => + if (v.isEmpty) javaPartition(k) = null else javaPartition(k) = v.get + } + new CarbonLoadModelBuilder(table).build( options.asJava, optionsFinal, carbonLoadModel, hadoopConf, - partition, + javaPartition.asJava, dataFrame.isDefined) // Delete stale segment folders that are not in table status but are physically present in // the Fact folder @@ -231,11 +233,7 @@ case class CarbonLoadDataCommand( // First system has to partition the data first and then call the load data LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)") // Clean up the old invalid segment data before creating a new entry for new load. - SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false) - DataLoadingUtil.deleteLoadsAndUpdateMetadata( - isForceDeletion = false, - table, - currPartitions) + SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions) // add the start entry for the new load in the table status file if (updateModel.isEmpty && !table.isHivePartitionTable) { CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta( @@ -672,7 +670,7 @@ case class CarbonLoadDataCommand( } } val columnCount = carbonLoadModel.getCsvHeaderColumns.length - val rdd = DataLoadingUtil.csvFileScanRDD( + val rdd = CsvRDDHelper.csvFileScanRDD( sparkSession, model = carbonLoadModel, hadoopConf).map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala index f074285..230378b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala @@ -22,8 +22,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.carbondata.common.exceptions.ConcurrentOperationException import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala index 5165342..2a92478 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala @@ -25,10 +25,10 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.types.ArrayType import org.apache.spark.storage.StorageLevel -import org.apache.carbondata.common.exceptions.ConcurrentOperationException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala index 5f8eb12..474f9c6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -23,12 +23,12 @@ import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCo import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog} import org.apache.spark.util.AlterTableUtil -import org.apache.carbondata.common.exceptions.ConcurrentOperationException import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.CarbonTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala index 8001a93..0298eea 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala @@ -28,12 +28,12 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.events._ -import org.apache.carbondata.spark.exception.{ConcurrentOperationException, ProcessMetaDataException} case class CarbonDropTableCommand( ifExistsSet: Boolean, http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala index 61a31a5..2eed988 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala @@ -50,11 +50,8 @@ import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutpu import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter import org.apache.carbondata.hadoop.internal.ObjectArrayWritable import org.apache.carbondata.hadoop.util.ObjectSerializationUtil -import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption} import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util} -import org.apache.carbondata.processing.loading.model.CarbonLoadModel -import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, Util} class CarbonFileFormat extends FileFormat http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala index a3df2be..1363797 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableWithRowParser.scala @@ -34,7 +34,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus} import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.streaming.parser.CarbonStreamParser case class FileElement(school: Array[String], age: Integer) @@ -735,7 +735,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll { def createSocketStreamingThread( spark: SparkSession, port: Int, - tablePath: CarbonTablePath, + tablePath: String, tableIdentifier: TableIdentifier, badRecordAction: String = "force", intervalSecond: Int = 2, @@ -776,7 +776,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll { qry = readSocketDF.writeStream .format("carbondata") .trigger(ProcessingTime(s"$intervalSecond seconds")) - .option("checkpointLocation", tablePath.getStreamingCheckpointDir) + .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(tablePath)) .option("bad_records_action", badRecordAction) .option("dbName", tableIdentifier.database.get) .option("tableName", tableIdentifier.table) @@ -817,7 +817,6 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll { val identifier = new TableIdentifier(tableName, Option("streaming1")) val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) .asInstanceOf[CarbonRelation].metaData.carbonTable - val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) var server: ServerSocket = null try { server = getServerSocket() @@ -830,7 +829,7 @@ class TestStreamingTableWithRowParser extends QueryTest with BeforeAndAfterAll { val thread2 = createSocketStreamingThread( spark = spark, port = server.getLocalPort, - tablePath = tablePath, + tablePath = carbonTable.getTablePath, tableIdentifier = identifier, badRecordAction = badRecordAction, intervalSecond = intervalOfIngest, http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala index 995f041..d94570a 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala @@ -29,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.spark.exception.ProcessMetaDataException class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll { http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java index 1104229..66f8bc5 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java @@ -74,7 +74,8 @@ public class DataMapWriterListener { } List<String> columns = factory.getMeta().getIndexedColumns(); List<AbstractDataMapWriter> writers = registry.get(columns); - AbstractDataMapWriter writer = factory.createWriter(new Segment(segmentId, null), dataWritePath); + AbstractDataMapWriter writer = factory.createWriter( + new Segment(segmentId, null), dataWritePath); if (writers != null) { writers.add(writer); } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java index 17e8dbe..29dfa40 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java @@ -19,6 +19,8 @@ package org.apache.carbondata.processing.loading.model; import java.io.IOException; import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -100,6 +102,26 @@ public class CarbonLoadModelBuilder { Map<String, String> optionsFinal, CarbonLoadModel carbonLoadModel, Configuration hadoopConf) throws InvalidLoadOptionException, IOException { + build(options, optionsFinal, carbonLoadModel, hadoopConf, new HashMap<String, String>(), false); + } + + /** + * build CarbonLoadModel for data loading + * @param options Load options from user input + * @param optionsFinal Load options that populated with default values for optional options + * @param carbonLoadModel The output load model + * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in + * user provided load options + * @param partitions partition name map to path + * @param isDataFrame true if build for load for dataframe + */ + public void build( + Map<String, String> options, + Map<String, String> optionsFinal, + CarbonLoadModel carbonLoadModel, + Configuration hadoopConf, + Map<String, String> partitions, + boolean isDataFrame) throws InvalidLoadOptionException, IOException { carbonLoadModel.setTableName(table.getTableName()); carbonLoadModel.setDatabaseName(table.getDatabaseName()); carbonLoadModel.setTablePath(table.getTablePath()); @@ -214,8 +236,18 @@ public class CarbonLoadModelBuilder { carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter)); carbonLoadModel.setCsvHeader(fileHeader); carbonLoadModel.setColDictFilePath(column_dict); + + List<String> ignoreColumns = new ArrayList<>(); + if (!isDataFrame) { + for (Map.Entry<String, String> partition : partitions.entrySet()) { + if (partition.getValue() != null) { + ignoreColumns.add(partition.getKey()); + } + } + } + carbonLoadModel.setCsvHeaderColumns( - LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf)); + LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf, ignoreColumns)); int validatedMaxColumns = validateMaxColumns( carbonLoadModel.getCsvHeaderColumns(), http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java index 5af4859..bac1a94 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java @@ -19,6 +19,8 @@ package org.apache.carbondata.processing.loading.model; import java.io.IOException; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import org.apache.carbondata.common.Maps; @@ -201,6 +203,16 @@ public class LoadOption { public static String[] getCsvHeaderColumns( CarbonLoadModel carbonLoadModel, Configuration hadoopConf) throws IOException { + return getCsvHeaderColumns(carbonLoadModel, hadoopConf, new LinkedList<String>()); + } + + /** + * Return CSV header field names, with partition column + */ + public static String[] getCsvHeaderColumns( + CarbonLoadModel carbonLoadModel, + Configuration hadoopConf, + List<String> staticPartitionCols) throws IOException { String delimiter; if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter())) { delimiter = CarbonCommonConstants.COMMA; @@ -231,7 +243,7 @@ public class LoadOption { } if (!CarbonDataProcessorUtil.isHeaderValid(carbonLoadModel.getTableName(), csvColumns, - carbonLoadModel.getCarbonDataLoadSchema())) { + carbonLoadModel.getCarbonDataLoadSchema(), staticPartitionCols)) { if (csvFile == null) { LOG.error("CSV header in DDL is not proper." + " Column names in schema and CSV header are not the same."); @@ -249,4 +261,5 @@ public class LoadOption { } return csvColumns; } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index d2faef5..142b2cb 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -21,7 +21,16 @@ import java.io.File; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -32,7 +41,6 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.locks.ICarbonLock; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails; @@ -592,10 +600,6 @@ public final class CarbonDataMergerUtil { List<LoadMetadataDetails> segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - CarbonTable carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(); - CarbonTableIdentifier tableIdentifier = carbonTable.getCarbonTableIdentifier(); - - // total length long totalLength = 0; @@ -1013,7 +1017,8 @@ public final class CarbonDataMergerUtil { CarbonFile[] updateDeltaFiles = null; Set<String> uniqueBlocks = new HashSet<String>(); - String segmentPath = CarbonTablePath.getSegmentPath(absoluteTableIdentifier.getTablePath(), seg.getSegmentNo()); + String segmentPath = CarbonTablePath.getSegmentPath( + absoluteTableIdentifier.getTablePath(), seg.getSegmentNo()); CarbonFile segDir = FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); CarbonFile[] allSegmentFiles = segDir.listFiles(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index ea11e22..ebcf944 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -405,8 +405,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { partitionSpec.getLocation().toString() + CarbonCommonConstants.FILE_SEPARATOR + carbonLoadModel.getFactTimeStamp() + ".tmp"; } else { - carbonStoreLocation = CarbonDataProcessorUtil - .createCarbonStoreLocation(carbonLoadModel.getDatabaseName(), tableName, carbonLoadModel.getSegmentId()); + carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation( + carbonLoadModel.getDatabaseName(), tableName, carbonLoadModel.getSegmentId()); } CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel .getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName, http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java index 278d5bb..2616def 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java @@ -76,8 +76,8 @@ public class RowResultMergerProcessor extends AbstractResultProcessor { partitionSpec.getLocation().toString() + CarbonCommonConstants.FILE_SEPARATOR + loadModel .getFactTimeStamp() + ".tmp"; } else { - carbonStoreLocation = CarbonDataProcessorUtil - .createCarbonStoreLocation(loadModel.getDatabaseName(), tableName, loadModel.getSegmentId()); + carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation( + loadModel.getDatabaseName(), tableName, loadModel.getSegmentId()); } CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel .getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName, http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java index df2e2a2..221697f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java @@ -47,8 +47,8 @@ public class RowResultProcessor { CarbonDataProcessorUtil.createLocations(tempStoreLocation); this.segmentProperties = segProp; String tableName = carbonTable.getTableName(); - String carbonStoreLocation = CarbonDataProcessorUtil - .createCarbonStoreLocation(loadModel.getDatabaseName(), tableName, loadModel.getSegmentId()); + String carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation( + loadModel.getDatabaseName(), tableName, loadModel.getSegmentId()); CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName, tempStoreLocation, carbonStoreLocation); http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index dc8ffd7..19ad47d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -370,7 +370,8 @@ public final class CarbonDataProcessorUtil { * * @return data directory path */ - public static String createCarbonStoreLocation(String databaseName, String tableName, String segmentId) { + public static String createCarbonStoreLocation(String databaseName, String tableName, + String segmentId) { CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName); return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segmentId); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/store/sdk/pom.xml ---------------------------------------------------------------------- diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml index 54fba55..b3dd464 100644 --- a/store/sdk/pom.xml +++ b/store/sdk/pom.xml @@ -7,7 +7,7 @@ <parent> <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-parent</artifactId> - <version>1.3.0-SNAPSHOT</version> + <version>1.4.0-SNAPSHOT</version> <relativePath>../../pom.xml</relativePath> </parent> http://git-wip-us.apache.org/repos/asf/carbondata/blob/3759a244/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java ---------------------------------------------------------------------- diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java index dc5696a..df6afc6 100644 --- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java +++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CSVCarbonWriter.java @@ -23,7 +23,7 @@ import java.util.UUID; import org.apache.carbondata.common.annotations.InterfaceAudience; import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat; -import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable; +import org.apache.carbondata.hadoop.internal.ObjectArrayWritable; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.hadoop.conf.Configuration; @@ -42,9 +42,9 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; @InterfaceAudience.Internal class CSVCarbonWriter extends CarbonWriter { - private RecordWriter<NullWritable, StringArrayWritable> recordWriter; + private RecordWriter<NullWritable, ObjectArrayWritable> recordWriter; private TaskAttemptContext context; - private StringArrayWritable writable; + private ObjectArrayWritable writable; CSVCarbonWriter(CarbonLoadModel loadModel) throws IOException { Configuration hadoopConf = new Configuration(); @@ -57,7 +57,7 @@ class CSVCarbonWriter extends CarbonWriter { TaskAttemptContextImpl context = new TaskAttemptContextImpl(hadoopConf, attemptID); this.recordWriter = format.getRecordWriter(context); this.context = context; - this.writable = new StringArrayWritable(); + this.writable = new ObjectArrayWritable(); } /**