[REBASE] Solve conflict after merging master
Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/7466d653 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/7466d653 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/7466d653 Branch: refs/heads/carbonstore-rebase4 Commit: 7466d65382eff27f86a3701e20bbea5019bb9d43 Parents: de36a5d Author: Jacky Li <jacky.li...@qq.com> Authored: Tue Feb 27 11:26:30 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Tue Feb 27 11:26:30 2018 +0800 ---------------------------------------------------------------------- .../ConcurrentOperationException.java | 50 -------------------- .../carbondata/core/datamap/TableDataMap.java | 5 +- .../carbondata/core/datamap/dev/DataMap.java | 9 +--- .../core/datamap/dev/DataMapFactory.java | 2 +- .../core/indexstore/BlockletDetailsFetcher.java | 3 +- .../blockletindex/BlockletDataMap.java | 3 +- .../blockletindex/SegmentIndexFileStore.java | 2 - .../core/metadata/PartitionMapFileStore.java | 0 .../statusmanager/SegmentStatusManager.java | 10 +++- .../SegmentUpdateStatusManager.java | 7 +-- .../hadoop/api/CarbonTableInputFormat.java | 5 +- .../preaggregate/TestPreAggCreateCommand.scala | 2 +- .../TestInsertAndOtherCommandConcurrent.scala | 2 +- .../StandardPartitionGlobalSortTestCase.scala | 2 +- .../exception/ProcessMetaDataException.java | 2 + .../org/apache/carbondata/api/CarbonStore.scala | 6 +-- .../load/DataLoadProcessBuilderOnSpark.scala | 4 +- .../carbondata/spark/util/CarbonScalaUtil.scala | 2 +- .../carbondata/spark/util/CommonUtil.scala | 2 - .../command/carbonTableSchemaCommon.scala | 6 +-- .../CarbonAlterTableCompactionCommand.scala | 14 +----- .../management/CarbonCleanFilesCommand.scala | 2 +- .../CarbonDeleteLoadByIdCommand.scala | 2 +- .../CarbonDeleteLoadByLoadDateCommand.scala | 2 +- .../management/CarbonLoadDataCommand.scala | 28 +++++------ .../CarbonProjectForDeleteCommand.scala | 2 +- .../CarbonProjectForUpdateCommand.scala | 2 +- .../schema/CarbonAlterTableRenameCommand.scala | 2 +- .../command/table/CarbonDropTableCommand.scala | 2 +- .../datasources/CarbonFileFormat.scala | 3 -- .../vectorreader/AddColumnTestCases.scala | 1 + .../datamap/DataMapWriterListener.java | 3 +- .../loading/model/CarbonLoadModelBuilder.java | 34 ++++++++++++- .../processing/loading/model/LoadOption.java | 15 +++++- .../processing/merger/CarbonDataMergerUtil.java | 19 +++++--- .../merger/CompactionResultSortProcessor.java | 4 +- .../merger/RowResultMergerProcessor.java | 4 +- .../partition/spliter/RowResultProcessor.java | 4 +- .../util/CarbonDataProcessorUtil.java | 3 +- store/sdk/pom.xml | 2 +- 40 files changed, 130 insertions(+), 142 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/common/src/main/java/org/apache/carbondata/common/exceptions/ConcurrentOperationException.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/carbondata/common/exceptions/ConcurrentOperationException.java b/common/src/main/java/org/apache/carbondata/common/exceptions/ConcurrentOperationException.java deleted file mode 100644 index 7308100..0000000 --- a/common/src/main/java/org/apache/carbondata/common/exceptions/ConcurrentOperationException.java +++ /dev/null @@ -1,50 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.carbondata.common.exceptions; - -import org.apache.carbondata.common.annotations.InterfaceAudience; -import org.apache.carbondata.common.annotations.InterfaceStability; - -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; - -/** - * This exception will be thrown when executing concurrent operations which - * is not supported in carbon. - * - * For example, when INSERT OVERWRITE is executing, other operations are not - * allowed, so this exception will be thrown - */ -@InterfaceAudience.User -@InterfaceStability.Stable -public class ConcurrentOperationException extends Exception { - - public ConcurrentOperationException(String dbName, String tableName, String command1, - String command2) { - super(command1 + " is in progress for table " + dbName + "." + tableName + ", " + command2 + - " operation is not allowed"); - } - - public ConcurrentOperationException(CarbonTable table, String command1, String command2) { - this(table.getDatabaseName(), table.getTableName(), command1, command2); - } - - public String getMessage() { - return super.getMessage(); - } - -} http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java index 2a6ceaa..5a01ec1 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/TableDataMap.java @@ -29,8 +29,8 @@ import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.indexstore.Blocklet; import org.apache.carbondata.core.indexstore.BlockletDetailsFetcher; import org.apache.carbondata.core.indexstore.ExtendedBlocklet; -import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.indexstore.FineGrainBlocklet; +import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.indexstore.SegmentPropertiesFetcher; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf; @@ -160,7 +160,8 @@ public final class TableDataMap extends OperationEventListener { writePath + CarbonCommonConstants.FILE_SEPARATOR + System.nanoTime(); detailedBlocklet.setDataMapWriterPath(blockletwritePath); serializer.serializeBlocklet((FineGrainBlocklet) blocklet, blockletwritePath); - }detailedBlocklet.setSegmentId(distributable.getSegment().getSegmentNo()); + } + detailedBlocklet.setSegmentId(distributable.getSegment().getSegmentNo()); detailedBlocklets.add(detailedBlocklet); } return detailedBlocklets; http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java index 4a68286..fdeacff 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMap.java @@ -38,18 +38,13 @@ public interface DataMap<T extends Blocklet> { /** * Prune the datamap with filter expression and partition information. It returns the list of * blocklets where these filters can exist. - * - * @param filterExp - * @return */ - List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, List<PartitionSpec> partitions); + List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, + List<PartitionSpec> partitions); // TODO Move this method to Abstract class /** * Validate whether the current segment needs to be fetching the required data - * - * @param filterExp - * @return */ boolean isScanRequired(FilterResolverIntf filterExp); http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java index 50ac279..d8a467f 100644 --- a/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java +++ b/core/src/main/java/org/apache/carbondata/core/datamap/dev/DataMapFactory.java @@ -21,8 +21,8 @@ import java.util.List; import org.apache.carbondata.core.datamap.DataMapDistributable; import org.apache.carbondata.core.datamap.DataMapMeta; -import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.DataMapType; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; import org.apache.carbondata.events.Event; http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java index dd592c0..58c11db 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/BlockletDetailsFetcher.java @@ -53,5 +53,6 @@ public interface BlockletDetailsFetcher { * @param segment * @return */ - List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions) throws IOException; + List<Blocklet> getAllBlocklets(Segment segment, List<PartitionSpec> partitions) + throws IOException; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java index ce6193b..b379ae3 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockletDataMap.java @@ -660,7 +660,8 @@ public class BlockletDataMap extends AbstractCoarseGrainDataMap implements Cache } @Override - public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, List<PartitionSpec> partitions) { + public List<Blocklet> prune(FilterResolverIntf filterExp, SegmentProperties segmentProperties, + List<PartitionSpec> partitions) { if (unsafeMemoryDMStore.getRowCount() == 0) { return new ArrayList<>(); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java index 537e124..00d03a5 100644 --- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/SegmentIndexFileStore.java @@ -21,10 +21,8 @@ import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java deleted file mode 100644 index e69de29..0000000 http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java index 89666ab..1bb4b03 100755 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java @@ -39,6 +39,7 @@ import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.fileoperations.AtomicFileOperations; import org.apache.carbondata.core.fileoperations.AtomicFileOperationsImpl; import org.apache.carbondata.core.fileoperations.FileWriteOperation; +import org.apache.carbondata.core.indexstore.PartitionSpec; import org.apache.carbondata.core.locks.CarbonLockFactory; import org.apache.carbondata.core.locks.CarbonLockUtil; import org.apache.carbondata.core.locks.ICarbonLock; @@ -838,6 +839,13 @@ public class SegmentStatusManager { public static void deleteLoadsAndUpdateMetadata( CarbonTable carbonTable, boolean isForceDeletion) throws IOException { + deleteLoadsAndUpdateMetadata(carbonTable, isForceDeletion, null); + } + + public static void deleteLoadsAndUpdateMetadata( + CarbonTable carbonTable, + boolean isForceDeletion, + List<PartitionSpec> partitionSpecs) throws IOException { if (isLoadDeletionRequired(carbonTable.getMetadataPath())) { LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); @@ -882,7 +890,7 @@ public class SegmentStatusManager { CarbonLockUtil.fileUnlock(carbonTableStatusLock, LockUsage.TABLE_STATUS_LOCK); if (updationCompletionStatus) { DeleteLoadFolders.physicalFactAndMeasureMetadataDeletion( - identifier, carbonTable.getMetadataPath(), isForceDeletion); + identifier, carbonTable.getMetadataPath(), isForceDeletion, partitionSpecs); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java index 6ec6fa2..4a2149e 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java @@ -77,8 +77,8 @@ public class SegmentUpdateStatusManager { this.identifier = identifier; // current it is used only for read function scenarios, as file update always requires to work // on latest file status. - segmentDetails = - SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(identifier.getTablePath())); + segmentDetails = SegmentStatusManager.readLoadMetadata( + CarbonTablePath.getMetadataPath(identifier.getTablePath())); if (segmentDetails.length > 0) { isPartitionTable = segmentDetails[0].getSegmentFile() != null; } @@ -259,7 +259,8 @@ public class SegmentUpdateStatusManager { + CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.PART_ID) .replace("#", "/") + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName; } else { - String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment); + String carbonDataDirectoryPath = + CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment); blockPath = carbonDataDirectoryPath + CarbonCommonConstants.FILE_SEPARATOR + completeBlockName; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index b485b69..5184e07 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -33,8 +33,8 @@ import java.util.Map; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datamap.DataMapStoreManager; -import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.DataMapType; +import org.apache.carbondata.core.datamap.Segment; import org.apache.carbondata.core.datamap.TableDataMap; import org.apache.carbondata.core.datastore.filesystem.CarbonFile; import org.apache.carbondata.core.datastore.impl.FileFactory; @@ -494,7 +494,8 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); for (Segment segment : streamSegments) { - String segmentDir = CarbonTablePath.getSegmentPath(identifier.getTablePath(), segment.getSegmentNo()); + String segmentDir = CarbonTablePath.getSegmentPath( + identifier.getTablePath(), segment.getSegmentNo()); FileFactory.FileType fileType = FileFactory.getFileType(segmentDir); if (FileFactory.isFileExist(segmentDir, fileType)) { String indexName = CarbonTablePath.getCarbonStreamIndexFileName(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala index 8f63af6..e09b922 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/preaggregate/TestPreAggCreateCommand.scala @@ -26,7 +26,7 @@ import org.apache.spark.sql.hive.CarbonRelation import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll -import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException +import org.apache.carbondata.common.exceptions.sql.{MalformedCarbonCommandException, MalformedDataMapCommandException} import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.metadata.schema.datamap.DataMapProvider.TIMESERIES http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala index b39c44c..3f0ca42 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/iud/TestInsertAndOtherCommandConcurrent.scala @@ -34,11 +34,11 @@ import org.apache.carbondata.core.datamap.dev.cgdatamap.{AbstractCoarseGrainData import org.apache.carbondata.core.datamap.dev.{AbstractDataMapWriter, DataMap} import org.apache.carbondata.core.datamap.{DataMapDistributable, DataMapMeta, DataMapStoreManager} import org.apache.carbondata.core.datastore.page.ColumnPage +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.scan.filter.intf.ExpressionType import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.events.Event -import org.apache.carbondata.spark.exception.ConcurrentOperationException import org.apache.carbondata.spark.testsuite.datamap.C2DataMapFactory // This testsuite test insert and insert overwrite with other commands concurrently http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala index 7d0959c..629417f 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionGlobalSortTestCase.scala @@ -24,9 +24,9 @@ import org.apache.spark.sql.test.util.QueryTest import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.common.constants.LoggerAction +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException class StandardPartitionGlobalSortTestCase extends QueryTest with BeforeAndAfterAll { var executorService: ExecutorService = _ http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java index 3e06bde..471b645 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/exception/ProcessMetaDataException.java @@ -17,6 +17,8 @@ package org.apache.carbondata.spark.exception; +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException; + // This exception will be thrown when processMetaData failed in // Carbon's RunnableCommand public class ProcessMetaDataException extends MalformedCarbonCommandException { http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala index b69ec37..bfb1616 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/api/CarbonStore.scala @@ -36,7 +36,6 @@ import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, SegmentFile import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager -import org.apache.carbondata.spark.util.DataLoadingUtil object CarbonStore { private val LOGGER = LogServiceFactory.getLogService(this.getClass.getCanonicalName) @@ -139,9 +138,8 @@ object CarbonStore { carbonCleanFilesLock = CarbonLockUtil .getLockObject(absoluteTableIdentifier, LockUsage.CLEAN_FILES_LOCK, errorMsg) - SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true) - DataLoadingUtil.deleteLoadsAndUpdateMetadata( - isForceDeletion = true, carbonTable, currentTablePartitions.map(_.asJava).orNull) + SegmentStatusManager.deleteLoadsAndUpdateMetadata( + carbonTable, true, currentTablePartitions.map(_.asJava).orNull) CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true) currentTablePartitions match { case Some(partitions) => http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index e1bd84b..2648642 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -24,6 +24,7 @@ import org.apache.spark.TaskContext import org.apache.spark.sql.{DataFrame, SparkSession} import org.apache.spark.sql.execution.command.ExecutionErrors import org.apache.spark.storage.StorageLevel +import org.apache.spark.util.CsvRDDHelper import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants @@ -34,7 +35,6 @@ import org.apache.carbondata.processing.loading.{DataLoadProcessBuilder, Failure import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.sort.sortdata.{NewRowComparator, NewRowComparatorForNormalDims, SortParameters} import org.apache.carbondata.processing.util.CarbonDataProcessorUtil -import org.apache.carbondata.spark.util.DataLoadingUtil /** * Use sortBy operator in spark to load the data @@ -52,7 +52,7 @@ object DataLoadProcessBuilderOnSpark { } else { // input data from files val columnCount = model.getCsvHeaderColumns.length - DataLoadingUtil.csvFileScanRDD(sparkSession, model, hadoopConf) + CsvRDDHelper.csvFileScanRDD(sparkSession, model, hadoopConf) .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount)) } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala index 33263d6..298c84e 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CarbonScalaUtil.scala @@ -30,6 +30,7 @@ import org.apache.spark.sql.catalyst.util.DateTimeUtils import org.apache.spark.sql.execution.command.{DataTypeInfo, UpdateTableModel} import org.apache.spark.sql.types._ +import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.LogService import org.apache.carbondata.core.cache.{Cache, CacheProvider, CacheType} import org.apache.carbondata.core.cache.dictionary.{Dictionary, DictionaryColumnUniqueIdentifier} @@ -45,7 +46,6 @@ import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.FailureCauses import org.apache.carbondata.processing.loading.exception.CarbonDataLoadingException import org.apache.carbondata.processing.util.CarbonDataProcessorUtil -import org.apache.carbondata.spark.exception.MalformedCarbonCommandException object CarbonScalaUtil { def convertSparkToCarbonDataType(dataType: DataType): CarbonDataType = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala index 9104a32..d3093fb 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/util/CommonUtil.scala @@ -816,8 +816,6 @@ object CommonUtil { val carbonTable = CarbonMetadata.getInstance .getCarbonTable(identifier.getCarbonTableIdentifier.getTableUniqueName) SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true) - DataLoadingUtil.deleteLoadsAndUpdateMetadata( - isForceDeletion = true, carbonTable, null) } catch { case _: Exception => LOGGER.warn(s"Error while cleaning table " + http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala index 71ce2c6..3c21af3 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/sql/execution/command/carbonTableSchemaCommon.scala @@ -31,19 +31,15 @@ import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment import org.apache.carbondata.core.indexstore.PartitionSpec -import org.apache.carbondata.core.metadata.CarbonTableIdentifier -import org.apache.carbondata.core.metadata.SegmentFileStore.SegmentFile -import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, CarbonTableIdentifier} +import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier import org.apache.carbondata.core.metadata.datatype.{DataType, DataTypes, DecimalType} import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema._ import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, RelationIdentifier, TableInfo, TableSchema} import org.apache.carbondata.core.metadata.schema.table.column.{ColumnSchema, ParentColumnTableRelation} -import org.apache.carbondata.core.service.CarbonCommonFactory import org.apache.carbondata.core.service.impl.ColumnUniqueIdGenerator import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentUpdateStatusManager} import org.apache.carbondata.core.util.DataTypeUtil -import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.processing.loading.FailureCauses import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.merger.CompactionType http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala index 75de1fe..cdea4c8 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAlterTableCompactionCommand.scala @@ -21,7 +21,7 @@ import java.io.{File, IOException} import scala.collection.JavaConverters._ -import org.apache.spark.sql.{CarbonEnv, Row, SQLContext, SparkSession} +import org.apache.spark.sql.{CarbonEnv, Row, SparkSession, SQLContext} import org.apache.spark.sql.catalyst.analysis.NoSuchTableException import org.apache.spark.sql.catalyst.TableIdentifier import org.apache.spark.sql.execution.command.{AlterTableModel, AtomicRunnableCommand, CarbonMergerMapping, CompactionModel} @@ -30,11 +30,11 @@ import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.util.CarbonException import org.apache.spark.util.AlterTableUtil -import org.apache.carbondata.common.exceptions.ConcurrentOperationException import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.locks.{CarbonLockFactory, LockUsage} import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.mutate.CarbonUpdateUtil @@ -91,16 +91,6 @@ case class CarbonAlterTableCompactionCommand( } override def processData(sparkSession: SparkSession): Seq[Row] = { - val LOGGER: LogService = - LogServiceFactory.getLogService(this.getClass.getName) - val isLoadInProgress = SegmentStatusManager.checkIfAnyLoadInProgressForTable(table) - if (isLoadInProgress) { - val message = "Cannot run data loading and compaction on same table concurrently. " + - "Please wait for load to finish" - LOGGER.error(message) - throw new ConcurrentOperationException(message) - } - var compactionType: CompactionType = null var compactionException = "true" try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala index d2adc57..2092028 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonCleanFilesCommand.scala @@ -26,10 +26,10 @@ import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.carbondata.api.CarbonStore import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.indexstore.PartitionSpec import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.events.{CleanFilesPostEvent, CleanFilesPreEvent, OperationContext, OperationListenerBus} -import org.apache.carbondata.spark.exception.ConcurrentOperationException import org.apache.carbondata.spark.util.CommonUtil /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala index 0861c63..81427a1 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByIdCommand.scala @@ -21,9 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.execution.command.{Checker, DataCommand} import org.apache.carbondata.api.CarbonStore +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.events.{DeleteSegmentByIdPostEvent, DeleteSegmentByIdPreEvent, OperationContext, OperationListenerBus} -import org.apache.carbondata.spark.exception.ConcurrentOperationException case class CarbonDeleteLoadByIdCommand( loadIds: Seq[String], http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala index dcbc6ce..1d76bda 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonDeleteLoadByLoadDateCommand.scala @@ -21,9 +21,9 @@ import org.apache.spark.sql.{CarbonEnv, Row, SparkSession} import org.apache.spark.sql.execution.command.{Checker, DataCommand} import org.apache.carbondata.api.CarbonStore +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.events.{DeleteSegmentByDatePostEvent, DeleteSegmentByDatePreEvent, OperationContext, OperationListenerBus} -import org.apache.carbondata.spark.exception.ConcurrentOperationException case class CarbonDeleteLoadByLoadDateCommand( databaseNameOp: Option[String], http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala index 4806a6f..92e8942 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.optimizer.CarbonFilters import org.apache.spark.sql.types._ import org.apache.spark.storage.StorageLevel import org.apache.spark.unsafe.types.UTF8String -import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, FileUtils} +import org.apache.spark.util.{CarbonReflectionUtils, CausedBy, CsvRDDHelper, FileUtils} import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} @@ -59,10 +59,9 @@ import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.{CarbonTable, TableInfo} import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema import org.apache.carbondata.core.mutate.{CarbonUpdateUtil, TupleIdEnum} -import org.apache.carbondata.core.statusmanager.SegmentStatus -import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, DataTypeUtil} import org.apache.carbondata.core.statusmanager.{SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil} +import org.apache.carbondata.core.util.DataTypeUtil import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.events.exception.PreEventException @@ -71,18 +70,15 @@ import org.apache.carbondata.processing.exception.DataLoadingException import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadMetadataEvent, LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} import org.apache.carbondata.processing.loading.exception.NoRetryException -import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption} -import org.apache.carbondata.processing.util.CarbonLoaderUtil +import org.apache.carbondata.processing.loading.model.{CarbonLoadModelBuilder, LoadOption} import org.apache.carbondata.processing.loading.model.CarbonLoadModel import org.apache.carbondata.processing.loading.sort.SortScopeOptions import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} import org.apache.carbondata.spark.dictionary.provider.SecureDictionaryServiceProvider import org.apache.carbondata.spark.dictionary.server.SecureDictionaryServer -import org.apache.carbondata.spark.rdd.{CarbonDataRDDFactory, CarbonDropPartitionRDD} -import org.apache.carbondata.spark.util.{CarbonScalaUtil, CommonUtil, GlobalDictionaryUtil} import org.apache.carbondata.spark.load.DataLoadProcessorStepOnSpark import org.apache.carbondata.spark.rdd.CarbonDataRDDFactory -import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl} +import org.apache.carbondata.spark.util.{CarbonScalaUtil, GlobalDictionaryUtil, SparkDataTypeConverterImpl} case class CarbonLoadDataCommand( databaseNameOp: Option[String], @@ -193,12 +189,18 @@ case class CarbonLoadDataCommand( carbonLoadModel.setAggLoadRequest( internalOptions.getOrElse(CarbonCommonConstants.IS_INTERNAL_LOAD_CALL, "false").toBoolean) carbonLoadModel.setSegmentId(internalOptions.getOrElse("mergedSegmentName", "")) + + val javaPartition = mutable.Map[String, String]() + partition.foreach { case (k, v) => + if (v.isEmpty) javaPartition(k) = null else javaPartition(k) = v.get + } + new CarbonLoadModelBuilder(table).build( options.asJava, optionsFinal, carbonLoadModel, hadoopConf, - partition, + javaPartition.asJava, dataFrame.isDefined) // Delete stale segment folders that are not in table status but are physically present in // the Fact folder @@ -231,11 +233,7 @@ case class CarbonLoadDataCommand( // First system has to partition the data first and then call the load data LOGGER.info(s"Initiating Direct Load for the Table : ($dbName.$tableName)") // Clean up the old invalid segment data before creating a new entry for new load. - SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false) - DataLoadingUtil.deleteLoadsAndUpdateMetadata( - isForceDeletion = false, - table, - currPartitions) + SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, currPartitions) // add the start entry for the new load in the table status file if (updateModel.isEmpty && !table.isHivePartitionTable) { CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta( @@ -672,7 +670,7 @@ case class CarbonLoadDataCommand( } } val columnCount = carbonLoadModel.getCsvHeaderColumns.length - val rdd = DataLoadingUtil.csvFileScanRDD( + val rdd = CsvRDDHelper.csvFileScanRDD( sparkSession, model = carbonLoadModel, hadoopConf).map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount)) http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala index f074285..230378b 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForDeleteCommand.scala @@ -22,8 +22,8 @@ import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan import org.apache.spark.sql.execution.command._ import org.apache.spark.sql.execution.datasources.LogicalRelation -import org.apache.carbondata.common.exceptions.ConcurrentOperationException import org.apache.carbondata.common.logging.LogServiceFactory +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala index 5165342..2a92478 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonProjectForUpdateCommand.scala @@ -25,10 +25,10 @@ import org.apache.spark.sql.execution.datasources.LogicalRelation import org.apache.spark.sql.types.ArrayType import org.apache.spark.storage.StorageLevel -import org.apache.carbondata.common.exceptions.ConcurrentOperationException import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.Segment +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, LockUsage} import org.apache.carbondata.core.mutate.CarbonUpdateUtil import org.apache.carbondata.core.statusmanager.SegmentStatusManager http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala index 5f8eb12..474f9c6 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/schema/CarbonAlterTableRenameCommand.scala @@ -23,12 +23,12 @@ import org.apache.spark.sql.execution.command.{AlterTableRenameModel, MetadataCo import org.apache.spark.sql.hive.{CarbonRelation, CarbonSessionCatalog} import org.apache.spark.util.AlterTableUtil -import org.apache.carbondata.common.exceptions.ConcurrentOperationException import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datamap.DataMapStoreManager import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.locks.{ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.CarbonTableIdentifier import org.apache.carbondata.core.metadata.schema.table.CarbonTable http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala index 8001a93..0298eea 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/command/table/CarbonDropTableCommand.scala @@ -28,12 +28,12 @@ import org.apache.spark.sql.execution.command.AtomicRunnableCommand import org.apache.carbondata.common.logging.{LogService, LogServiceFactory} import org.apache.carbondata.core.cache.dictionary.ManageDictionaryAndBTree import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.exception.ConcurrentOperationException import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, LockUsage} import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.SegmentStatusManager import org.apache.carbondata.core.util.CarbonUtil import org.apache.carbondata.events._ -import org.apache.carbondata.spark.exception.{ConcurrentOperationException, ProcessMetaDataException} case class CarbonDropTableCommand( ifExistsSet: Boolean, http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala index 61a31a5..2eed988 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala @@ -50,11 +50,8 @@ import org.apache.carbondata.hadoop.api.{CarbonOutputCommitter, CarbonTableOutpu import org.apache.carbondata.hadoop.api.CarbonTableOutputFormat.CarbonRecordWriter import org.apache.carbondata.hadoop.internal.ObjectArrayWritable import org.apache.carbondata.hadoop.util.ObjectSerializationUtil -import org.apache.carbondata.processing.loading.csvinput.StringArrayWritable import org.apache.carbondata.processing.loading.model.{CarbonLoadModel, CarbonLoadModelBuilder, LoadOption} import org.apache.carbondata.spark.util.{CarbonScalaUtil, Util} -import org.apache.carbondata.processing.loading.model.CarbonLoadModel -import org.apache.carbondata.spark.util.{CarbonScalaUtil, DataLoadingUtil, Util} class CarbonFileFormat extends FileFormat http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala index 995f041..d94570a 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/vectorreader/AddColumnTestCases.scala @@ -29,6 +29,7 @@ import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.common.exceptions.sql.MalformedCarbonCommandException import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.util.CarbonProperties +import org.apache.carbondata.spark.exception.ProcessMetaDataException class AddColumnTestCases extends Spark2QueryTest with BeforeAndAfterAll { http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java index 1104229..66f8bc5 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java +++ b/processing/src/main/java/org/apache/carbondata/processing/datamap/DataMapWriterListener.java @@ -74,7 +74,8 @@ public class DataMapWriterListener { } List<String> columns = factory.getMeta().getIndexedColumns(); List<AbstractDataMapWriter> writers = registry.get(columns); - AbstractDataMapWriter writer = factory.createWriter(new Segment(segmentId, null), dataWritePath); + AbstractDataMapWriter writer = factory.createWriter( + new Segment(segmentId, null), dataWritePath); if (writers != null) { writers.add(writer); } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java index 17e8dbe..29dfa40 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModelBuilder.java @@ -19,6 +19,8 @@ package org.apache.carbondata.processing.loading.model; import java.io.IOException; import java.text.SimpleDateFormat; +import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -100,6 +102,26 @@ public class CarbonLoadModelBuilder { Map<String, String> optionsFinal, CarbonLoadModel carbonLoadModel, Configuration hadoopConf) throws InvalidLoadOptionException, IOException { + build(options, optionsFinal, carbonLoadModel, hadoopConf, new HashMap<String, String>(), false); + } + + /** + * build CarbonLoadModel for data loading + * @param options Load options from user input + * @param optionsFinal Load options that populated with default values for optional options + * @param carbonLoadModel The output load model + * @param hadoopConf hadoopConf is needed to read CSV header if there 'fileheader' is not set in + * user provided load options + * @param partitions partition name map to path + * @param isDataFrame true if build for load for dataframe + */ + public void build( + Map<String, String> options, + Map<String, String> optionsFinal, + CarbonLoadModel carbonLoadModel, + Configuration hadoopConf, + Map<String, String> partitions, + boolean isDataFrame) throws InvalidLoadOptionException, IOException { carbonLoadModel.setTableName(table.getTableName()); carbonLoadModel.setDatabaseName(table.getDatabaseName()); carbonLoadModel.setTablePath(table.getTablePath()); @@ -214,8 +236,18 @@ public class CarbonLoadModelBuilder { carbonLoadModel.setCsvDelimiter(CarbonUtil.unescapeChar(delimeter)); carbonLoadModel.setCsvHeader(fileHeader); carbonLoadModel.setColDictFilePath(column_dict); + + List<String> ignoreColumns = new ArrayList<>(); + if (!isDataFrame) { + for (Map.Entry<String, String> partition : partitions.entrySet()) { + if (partition.getValue() != null) { + ignoreColumns.add(partition.getKey()); + } + } + } + carbonLoadModel.setCsvHeaderColumns( - LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf)); + LoadOption.getCsvHeaderColumns(carbonLoadModel, hadoopConf, ignoreColumns)); int validatedMaxColumns = validateMaxColumns( carbonLoadModel.getCsvHeaderColumns(), http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java index 5af4859..bac1a94 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/LoadOption.java @@ -19,6 +19,8 @@ package org.apache.carbondata.processing.loading.model; import java.io.IOException; import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; import java.util.Map; import org.apache.carbondata.common.Maps; @@ -201,6 +203,16 @@ public class LoadOption { public static String[] getCsvHeaderColumns( CarbonLoadModel carbonLoadModel, Configuration hadoopConf) throws IOException { + return getCsvHeaderColumns(carbonLoadModel, hadoopConf, new LinkedList<String>()); + } + + /** + * Return CSV header field names, with partition column + */ + public static String[] getCsvHeaderColumns( + CarbonLoadModel carbonLoadModel, + Configuration hadoopConf, + List<String> staticPartitionCols) throws IOException { String delimiter; if (StringUtils.isEmpty(carbonLoadModel.getCsvDelimiter())) { delimiter = CarbonCommonConstants.COMMA; @@ -231,7 +243,7 @@ public class LoadOption { } if (!CarbonDataProcessorUtil.isHeaderValid(carbonLoadModel.getTableName(), csvColumns, - carbonLoadModel.getCarbonDataLoadSchema())) { + carbonLoadModel.getCarbonDataLoadSchema(), staticPartitionCols)) { if (csvFile == null) { LOG.error("CSV header in DDL is not proper." + " Column names in schema and CSV header are not the same."); @@ -249,4 +261,5 @@ public class LoadOption { } return csvColumns; } + } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index d2faef5..142b2cb 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -21,7 +21,16 @@ import java.io.File; import java.io.IOException; import java.text.ParseException; import java.text.SimpleDateFormat; -import java.util.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -32,7 +41,6 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.locks.ICarbonLock; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails; @@ -592,10 +600,6 @@ public final class CarbonDataMergerUtil { List<LoadMetadataDetails> segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - CarbonTable carbonTable = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable(); - CarbonTableIdentifier tableIdentifier = carbonTable.getCarbonTableIdentifier(); - - // total length long totalLength = 0; @@ -1013,7 +1017,8 @@ public final class CarbonDataMergerUtil { CarbonFile[] updateDeltaFiles = null; Set<String> uniqueBlocks = new HashSet<String>(); - String segmentPath = CarbonTablePath.getSegmentPath(absoluteTableIdentifier.getTablePath(), seg.getSegmentNo()); + String segmentPath = CarbonTablePath.getSegmentPath( + absoluteTableIdentifier.getTablePath(), seg.getSegmentNo()); CarbonFile segDir = FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); CarbonFile[] allSegmentFiles = segDir.listFiles(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index ea11e22..ebcf944 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -405,8 +405,8 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { partitionSpec.getLocation().toString() + CarbonCommonConstants.FILE_SEPARATOR + carbonLoadModel.getFactTimeStamp() + ".tmp"; } else { - carbonStoreLocation = CarbonDataProcessorUtil - .createCarbonStoreLocation(carbonLoadModel.getDatabaseName(), tableName, carbonLoadModel.getSegmentId()); + carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation( + carbonLoadModel.getDatabaseName(), tableName, carbonLoadModel.getSegmentId()); } CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel .getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName, http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java index 278d5bb..2616def 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java @@ -76,8 +76,8 @@ public class RowResultMergerProcessor extends AbstractResultProcessor { partitionSpec.getLocation().toString() + CarbonCommonConstants.FILE_SEPARATOR + loadModel .getFactTimeStamp() + ".tmp"; } else { - carbonStoreLocation = CarbonDataProcessorUtil - .createCarbonStoreLocation(loadModel.getDatabaseName(), tableName, loadModel.getSegmentId()); + carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation( + loadModel.getDatabaseName(), tableName, loadModel.getSegmentId()); } CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel .getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName, http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java index df2e2a2..221697f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/partition/spliter/RowResultProcessor.java @@ -47,8 +47,8 @@ public class RowResultProcessor { CarbonDataProcessorUtil.createLocations(tempStoreLocation); this.segmentProperties = segProp; String tableName = carbonTable.getTableName(); - String carbonStoreLocation = CarbonDataProcessorUtil - .createCarbonStoreLocation(loadModel.getDatabaseName(), tableName, loadModel.getSegmentId()); + String carbonStoreLocation = CarbonDataProcessorUtil.createCarbonStoreLocation( + loadModel.getDatabaseName(), tableName, loadModel.getSegmentId()); CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel.getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName, tempStoreLocation, carbonStoreLocation); http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index dc8ffd7..19ad47d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -370,7 +370,8 @@ public final class CarbonDataProcessorUtil { * * @return data directory path */ - public static String createCarbonStoreLocation(String databaseName, String tableName, String segmentId) { + public static String createCarbonStoreLocation(String databaseName, String tableName, + String segmentId) { CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName); return CarbonTablePath.getSegmentPath(carbonTable.getTablePath(), segmentId); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/7466d653/store/sdk/pom.xml ---------------------------------------------------------------------- diff --git a/store/sdk/pom.xml b/store/sdk/pom.xml index 54fba55..b3dd464 100644 --- a/store/sdk/pom.xml +++ b/store/sdk/pom.xml @@ -7,7 +7,7 @@ <parent> <groupId>org.apache.carbondata</groupId> <artifactId>carbondata-parent</artifactId> - <version>1.3.0-SNAPSHOT</version> + <version>1.4.0-SNAPSHOT</version> <relativePath>../../pom.xml</relativePath> </parent>