[CARBONDATA-1992] Remove partitionId in CarbonTablePath In CarbonTablePath, there is a deprecated partition id which is always 0, it should be removed to avoid confusion.
This closes #1765 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/d3cbb026 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/d3cbb026 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/d3cbb026 Branch: refs/heads/carbonstore-rebase Commit: d3cbb026cc8ea66e8ec49beb83b7604575714c78 Parents: da549c2 Author: Jacky Li <jacky.li...@qq.com> Authored: Sat Jan 6 20:28:44 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Sat Feb 10 02:20:09 2018 +0800 ---------------------------------------------------------------------- .../core/metadata/PartitionMapFileStore.java | 2 +- .../core/mutate/CarbonUpdateUtil.java | 8 +- .../core/statusmanager/LoadMetadataDetails.java | 2 + .../SegmentUpdateStatusManager.java | 8 +- .../apache/carbondata/core/util/CarbonUtil.java | 6 +- .../core/util/path/CarbonTablePath.java | 55 ++++--- .../CarbonFormatDirectoryStructureTest.java | 4 +- .../hadoop/api/CarbonTableInputFormat.java | 2 +- .../streaming/CarbonStreamRecordWriter.java | 2 +- .../hadoop/test/util/StoreCreator.java | 1 - .../presto/util/CarbonDataStoreCreator.scala | 1 - .../dataload/TestLoadDataGeneral.scala | 2 +- .../InsertIntoCarbonTableTestCase.scala | 4 +- .../dataload/TestBatchSortDataLoad.scala | 3 +- .../dataload/TestDataLoadWithFileName.scala | 2 +- .../dataload/TestGlobalSortDataLoad.scala | 4 +- .../testsuite/datamap/TestDataMapCommand.scala | 34 ++-- .../TestDataLoadingForPartitionTable.scala | 3 +- .../StandardPartitionTableCleanTestCase.scala | 2 +- ...andardPartitionTableCompactionTestCase.scala | 2 +- .../StandardPartitionTableLoadingTestCase.scala | 4 +- .../load/DataLoadProcessBuilderOnSpark.scala | 1 - .../load/DataLoadProcessorStepOnSpark.scala | 2 +- .../spark/rdd/AlterTableLoadPartitionRDD.scala | 154 +++++++++++-------- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 11 +- .../spark/rdd/NewCarbonDataLoadRDD.scala | 25 ++- .../org/apache/spark/util/PartitionUtils.scala | 5 +- .../spark/rdd/CarbonDataRDDFactory.scala | 5 +- .../datasources/CarbonFileFormat.scala | 1 - .../partition/TestAlterPartitionTable.scala | 2 +- .../bucketing/TableBucketingTestCase.scala | 2 + .../loading/CarbonDataLoadConfiguration.java | 10 -- .../loading/DataLoadProcessBuilder.java | 1 - .../loading/TableProcessingOperations.java | 3 +- .../loading/model/CarbonLoadModel.java | 72 +-------- .../sort/impl/ParallelReadMergeSorterImpl.java | 4 +- ...arallelReadMergeSorterWithBucketingImpl.java | 15 +- .../UnsafeBatchParallelReadMergeSorterImpl.java | 7 +- ...arallelReadMergeSorterWithBucketingImpl.java | 21 ++- .../CarbonRowDataWriterProcessorStepImpl.java | 33 ++-- .../steps/DataWriterBatchProcessorStepImpl.java | 25 +-- .../steps/DataWriterProcessorStepImpl.java | 22 +-- .../processing/merger/CarbonDataMergerUtil.java | 6 +- .../merger/CompactionResultSortProcessor.java | 4 +- .../sort/sortdata/SortParameters.java | 16 +- .../store/CarbonFactDataHandlerModel.java | 7 +- .../util/CarbonDataProcessorUtil.java | 12 +- .../processing/util/CarbonLoaderUtil.java | 12 +- .../processing/util/DeleteLoadFolders.java | 7 +- .../carbondata/processing/StoreCreator.java | 1 - .../carbondata/streaming/StreamHandoffRDD.scala | 1 - .../streaming/StreamSinkFactory.scala | 2 +- .../streaming/CarbonAppendableStreamSink.scala | 8 +- 53 files changed, 285 insertions(+), 363 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java index 1e9cbc4..43310fe 100644 --- a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java +++ b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java @@ -363,7 +363,7 @@ public class PartitionMapFileStore { List<String> toBeDeletedIndexFiles = new ArrayList<>(); List<String> toBeDeletedDataFiles = new ArrayList<>(); // take the list of files from this segment. - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment.getLoadName()); + String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segment.getLoadName()); String partitionFilePath = getPartitionFilePath(segmentPath); if (partitionFilePath != null) { PartitionMapper partitionMapper = readPartitionMap(partitionFilePath); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java index c5f61c2..e0b208f 100644 --- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java @@ -297,9 +297,7 @@ public class CarbonUpdateUtil { CarbonTablePath carbonTablePath = CarbonStorePath .getCarbonTablePath(absoluteTableIdentifier.getTablePath(), absoluteTableIdentifier.getCarbonTableIdentifier()); - // as of now considering only partition 0. - String partitionId = "0"; - String partitionDir = carbonTablePath.getPartitionDir(partitionId); + String partitionDir = carbonTablePath.getPartitionDir(); CarbonFile file = FileFactory.getCarbonFile(partitionDir, FileFactory.getFileType(partitionDir)); if (!file.exists()) { @@ -380,7 +378,7 @@ public class CarbonUpdateUtil { } public static long getLatestTaskIdForSegment(String segmentId, CarbonTablePath tablePath) { - String segmentDirPath = tablePath.getCarbonDataDirectoryPath("0", segmentId); + String segmentDirPath = tablePath.getCarbonDataDirectoryPath(segmentId); // scan all the carbondata files and get the latest task ID. CarbonFile segment = @@ -445,7 +443,7 @@ public class CarbonUpdateUtil { || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) { // take the list of files from this segment. - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment.getLoadName()); + String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segment.getLoadName()); CarbonFile segDir = FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); CarbonFile[] allSegmentFiles = segDir.listFiles(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java index 85602bc..73a665d 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java @@ -123,10 +123,12 @@ public class LoadMetadataDetails implements Serializable { */ private FileFormat fileFormat = FileFormat.COLUMNAR_V3; + @Deprecated public String getPartitionCount() { return partitionCount; } + @Deprecated public void setPartitionCount(String partitionCount) { this.partitionCount = partitionCount; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java index e0e7b70..d4ef5c6 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java @@ -197,7 +197,7 @@ public class SegmentUpdateStatusManager { new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); String endTimeStamp = ""; String startTimeStamp = ""; - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId); + String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId); CarbonFile segDir = FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); for (LoadMetadataDetails eachSeg : segmentDetails) { @@ -292,7 +292,7 @@ public class SegmentUpdateStatusManager { .getCarbonTablePath(absoluteTableIdentifier.getTablePath(), absoluteTableIdentifier.getCarbonTableIdentifier()); String segment = CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.SEGMENT_ID); - String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment); + String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath(segment); String completeBlockName = CarbonTablePath.addDataPartPrefix( CarbonUpdateUtil.getRequiredFieldFromTID(tupleId, TupleIdEnum.BLOCK_ID) + CarbonCommonConstants.FACT_FILE_EXT); @@ -424,7 +424,7 @@ public class SegmentUpdateStatusManager { .getCarbonTablePath(absoluteTableIdentifier.getTablePath(), absoluteTableIdentifier.getCarbonTableIdentifier()); - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId); + String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId); CarbonFile segDir = FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); @@ -867,7 +867,7 @@ public class SegmentUpdateStatusManager { // filter out the fact files. - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId); + String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId); CarbonFile segDir = FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index c208154..83e7d52 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -1103,7 +1103,7 @@ public final class CarbonUtil { // geting the index file path //TODO need to pass proper partition number when partiton will be supported String carbonIndexFilePath = carbonTablePath - .getCarbonIndexFilePath(taskId, "0", tableBlockInfoList.get(0).getSegmentId(), + .getCarbonIndexFilePath(taskId, tableBlockInfoList.get(0).getSegmentId(), bucketNumber, CarbonTablePath.DataFileUtil .getTimeStampFromFileName(tableBlockInfoList.get(0).getFilePath()), tableBlockInfoList.get(0).getVersion()); @@ -1346,7 +1346,7 @@ public final class CarbonUtil { // geting the index file path //TODO need to pass proper partition number when partiton will be supported String carbonIndexFilePath = carbonTablePath - .getCarbonIndexFilePath(taskId, "0", tableBlockInfoList.get(0).getSegmentId(), + .getCarbonIndexFilePath(taskId, tableBlockInfoList.get(0).getSegmentId(), bucketNumber, CarbonTablePath.DataFileUtil .getTimeStampFromFileName(tableBlockInfoList.get(0).getFilePath()), tableBlockInfoList.get(0).getVersion()); @@ -2303,7 +2303,7 @@ public final class CarbonUtil { long carbonDataSize = 0L; long carbonIndexSize = 0L; HashMap<String, Long> dataAndIndexSize = new HashMap<String, Long>(); - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId); + String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId); FileFactory.FileType fileType = FileFactory.getFileType(segmentPath); switch (fileType) { case HDFS: http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java index 5a63d2f..e107317 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java @@ -278,15 +278,14 @@ public class CarbonTablePath extends Path { /** * Gets absolute path of data file * - * @param partitionId unique partition identifier * @param segmentId unique partition identifier * @param filePartNo data file part number * @param factUpdateTimeStamp unique identifier to identify an update * @return absolute path of data file stored in carbon data format */ - public String getCarbonDataFilePath(String partitionId, String segmentId, Integer filePartNo, - Long taskNo, int batchNo, int bucketNumber, String factUpdateTimeStamp) { - return getSegmentDir(partitionId, segmentId) + File.separator + getCarbonDataFileName( + public String getCarbonDataFilePath(String segmentId, Integer filePartNo, Long taskNo, + int batchNo, int bucketNumber, String factUpdateTimeStamp) { + return getSegmentDir(segmentId) + File.separator + getCarbonDataFileName( filePartNo, taskNo, bucketNumber, batchNo, factUpdateTimeStamp); } @@ -295,13 +294,12 @@ public class CarbonTablePath extends Path { * based on task id * * @param taskId task id of the file - * @param partitionId partition number * @param segmentId segment number * @return full qualified carbon index path */ - public String getCarbonIndexFilePath(final String taskId, final String partitionId, - final String segmentId, final String bucketNumber) { - String segmentDir = getSegmentDir(partitionId, segmentId); + public String getCarbonIndexFilePath(final String taskId, final String segmentId, + final String bucketNumber) { + String segmentDir = getSegmentDir(segmentId); CarbonFile carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)); @@ -317,9 +315,8 @@ public class CarbonTablePath extends Path { if (files.length > 0) { return files[0].getAbsolutePath(); } else { - throw new RuntimeException("Missing Carbon index file for partition[" - + partitionId + "] Segment[" + segmentId + "], taskId[" + taskId - + "]"); + throw new RuntimeException("Missing Carbon index file for Segment[" + segmentId + "], " + + "taskId[" + taskId + "]"); } } @@ -327,8 +324,6 @@ public class CarbonTablePath extends Path { * Below method will be used to get the carbon index file path * @param taskId * task id - * @param partitionId - * partition id * @param segmentId * segment id * @param bucketNumber @@ -337,28 +332,27 @@ public class CarbonTablePath extends Path { * timestamp * @return carbon index file path */ - public String getCarbonIndexFilePath(String taskId, String partitionId, String segmentId, - String bucketNumber, String timeStamp, ColumnarFormatVersion columnarFormatVersion) { + public String getCarbonIndexFilePath(String taskId, String segmentId, String bucketNumber, + String timeStamp, ColumnarFormatVersion columnarFormatVersion) { switch (columnarFormatVersion) { case V1: case V2: - return getCarbonIndexFilePath(taskId, partitionId, segmentId, bucketNumber); + return getCarbonIndexFilePath(taskId, segmentId, bucketNumber); default: - String segmentDir = getSegmentDir(partitionId, segmentId); + String segmentDir = getSegmentDir(segmentId); return segmentDir + File.separator + getCarbonIndexFileName(taskId, Integer.parseInt(bucketNumber), timeStamp); } } - public String getCarbonIndexFilePath(String taskId, String partitionId, String segmentId, - int batchNo, String bucketNumber, String timeStamp, - ColumnarFormatVersion columnarFormatVersion) { + public String getCarbonIndexFilePath(String taskId, String segmentId, int batchNo, + String bucketNumber, String timeStamp, ColumnarFormatVersion columnarFormatVersion) { switch (columnarFormatVersion) { case V1: case V2: - return getCarbonIndexFilePath(taskId, partitionId, segmentId, bucketNumber); + return getCarbonIndexFilePath(taskId, segmentId, bucketNumber); default: - String segmentDir = getSegmentDir(partitionId, segmentId); + String segmentDir = getSegmentDir(segmentId); return segmentDir + File.separator + getCarbonIndexFileName(Long.parseLong(taskId), Integer.parseInt(bucketNumber), batchNo, timeStamp); } @@ -375,12 +369,11 @@ public class CarbonTablePath extends Path { /** * Gets absolute path of data file * - * @param partitionId unique partition identifier * @param segmentId unique partition identifier * @return absolute path of data file stored in carbon data format */ - public String getCarbonDataDirectoryPath(String partitionId, String segmentId) { - return getSegmentDir(partitionId, segmentId); + public String getCarbonDataDirectoryPath(String segmentId) { + return getSegmentDir(segmentId); } /** @@ -418,12 +411,16 @@ public class CarbonTablePath extends Path { return segmentDir + File.separator + getCarbonStreamIndexFileName(); } - public String getSegmentDir(String partitionId, String segmentId) { - return getPartitionDir(partitionId) + File.separator + SEGMENT_PREFIX + segmentId; + public String getSegmentDir(String segmentId) { + return getPartitionDir() + File.separator + SEGMENT_PREFIX + segmentId; } - public String getPartitionDir(String partitionId) { - return getFactDir() + File.separator + PARTITION_PREFIX + partitionId; + // This partition is not used in any code logic, just keep backward compatibility + public static final String DEPRECATED_PATITION_ID = "0"; + + public String getPartitionDir() { + return getFactDir() + File.separator + PARTITION_PREFIX + + CarbonTablePath.DEPRECATED_PATITION_ID; } private String getMetaDataDir() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java index 5549806..a1ccab3 100644 --- a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java +++ b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java @@ -53,8 +53,8 @@ public class CarbonFormatDirectoryStructureTest { .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.dictmeta")); assertTrue(carbonTablePath.getSortIndexFilePath("t1_c1").replace("\\", "/") .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.sortindex")); - assertTrue(carbonTablePath.getCarbonDataFilePath("1", "2", 3, 4L, 0, 0, "999").replace("\\", "/") - .equals(CARBON_STORE + "/d1/t1/Fact/Part1/Segment_2/part-3-4_batchno0-0-999.carbondata")); + assertTrue(carbonTablePath.getCarbonDataFilePath("2", 3, 4L, 0, 0, "999").replace("\\", "/") + .equals(CARBON_STORE + "/d1/t1/Fact/Part0/Segment_2/part-3-4_batchno0-0-999.carbondata")); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index a1887f0..6f1e123 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -480,7 +480,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); for (String segmentId : streamSegments) { - String segmentDir = tablePath.getSegmentDir("0", segmentId); + String segmentDir = tablePath.getSegmentDir(segmentId); FileFactory.FileType fileType = FileFactory.getFileType(segmentDir); if (FileFactory.isFileExist(segmentDir, fileType)) { String indexName = CarbonTablePath.getCarbonStreamIndexFileName(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java index 364a6a6..3ef8afc 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java @@ -129,7 +129,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> { CarbonTablePath tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier()); - segmentDir = tablePath.getSegmentDir("0", segmentId); + segmentDir = tablePath.getSegmentDir(segmentId); fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0"); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java index fbf33d6..ac17c4e 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java @@ -154,7 +154,6 @@ public class StoreCreator { loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(",")); loadModel.setTaskNo("0"); loadModel.setSegmentId("0"); - loadModel.setPartitionId("0"); loadModel.setFactTimeStamp(System.currentTimeMillis()); loadModel.setMaxColumns("10"); return loadModel; http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala ---------------------------------------------------------------------- diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala index 7b5c311..a41e738 100644 --- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala +++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala @@ -135,7 +135,6 @@ object CarbonDataStoreCreator { loadModel.setCsvHeaderColumns(loadModel.getCsvHeader.split(",")) loadModel.setTaskNo("0") loadModel.setSegmentId("0") - loadModel.setPartitionId("0") loadModel.setFactTimeStamp(System.currentTimeMillis()) loadModel.setMaxColumns("15") executeGraph(loadModel, storePath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala index 09ca9e5..c84ae6b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala @@ -49,7 +49,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach { tableName: String): Boolean = { val carbonTable = CarbonMetadata.getInstance().getCarbonTable(datbaseName, tableName) val partitionPath = CarbonStorePath - .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0") + .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir val fileType: FileFactory.FileType = FileFactory.getFileType(partitionPath) val carbonFile = FileFactory.getCarbonFile(partitionPath, fileType) val segments: ArrayBuffer[String] = ArrayBuffer() http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala index d59f0b5..5cc4156 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala @@ -232,7 +232,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { checkAnswer(sql("select count(*) from CarbonOverwrite"), sql("select count(*) from HiveOverwrite")) val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbonoverwrite") val partitionPath = CarbonStorePath - .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0") + .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir val folder = new File(partitionPath) assert(folder.isDirectory) assert(folder.list().length == 1) @@ -255,7 +255,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { checkAnswer(sql("select count(*) from TCarbonSourceOverwrite"), sql("select count(*) from HiveOverwrite")) val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "tcarbonsourceoverwrite") val partitionPath = CarbonStorePath - .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0") + .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir val folder = new File(partitionPath) assert(folder.isDirectory) http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala index 4af9d54..42ac4df 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala @@ -193,9 +193,8 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll { CarbonCommonConstants.DATABASE_DEFAULT_NAME, tableName ) - val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", segmentNo) + val segmentDir = carbonTablePath.getCarbonDataDirectoryPath(segmentNo) new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size() } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala index dae0962..db0a62c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala @@ -49,7 +49,7 @@ class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll { val indexReader = new CarbonIndexFileReader() val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "test_table_v3") val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", "0") + val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0") val carbonIndexPaths = new File(segmentDir) .listFiles(new FilenameFilter { override def accept(dir: File, name: String): Boolean = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala index 0d9e0fd..479db50 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala @@ -272,7 +272,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE carbon_globalsort") val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbon_globalsort") val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val segmentDir = carbonTablePath.getSegmentDir("0", "0") + val segmentDir = carbonTablePath.getSegmentDir("0") assertResult(Math.max(7, defaultParallelism) + 1)(new File(segmentDir).listFiles().length) } @@ -379,7 +379,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = { val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName) val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", segmentNo) + val segmentDir = carbonTablePath.getCarbonDataDirectoryPath(segmentNo) new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size() } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala index 146ad62..5170c43 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala @@ -217,19 +217,27 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { } test("test if preaggregate load is successfull for hivemetastore") { - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true") - sql("DROP TABLE IF EXISTS maintable") - sql( - """ - | CREATE TABLE maintable(id int, name string, city string, age int) - | STORED BY 'org.apache.carbondata.format' - """.stripMargin) - sql( - s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id""" - .stripMargin) - sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") - checkAnswer(sql(s"select * from maintable_preagg_sum"), - Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55))) + try { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true") + sql("DROP TABLE IF EXISTS maintable") + sql( + """ + | CREATE TABLE maintable(id int, name string, city string, age int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql( + s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id""" + + .stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + checkAnswer(sql(s"select * from maintable_preagg_sum"), + Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55))) + } finally { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) + } } test("test preaggregate load for decimal column for hivemetastore") { http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala index ed151bd..0a21aed 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala @@ -63,7 +63,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, carbonTable.getTablePath) - val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId) + val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId) val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) val dataFiles = carbonFile.listFiles(new CarbonFileFilter() { override def accept(file: CarbonFile): Boolean = { @@ -87,6 +87,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, | utilization int,salary int) | PARTITIONED BY (empno int) + | | STORED BY 'org.apache.carbondata.format' | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3') """.stripMargin) http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala index 2b0dd09..5427981 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCleanTestCase.scala @@ -53,7 +53,7 @@ class StandardPartitionTableCleanTestCase extends QueryTest with BeforeAndAfterA val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, carbonTable.getTablePath) - val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId) + val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId) val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) val dataFiles = carbonFile.listFiles(new CarbonFileFilter() { override def accept(file: CarbonFile): Boolean = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala index 22ebd80..f4b6e0e 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableCompactionTestCase.scala @@ -53,7 +53,7 @@ class StandardPartitionTableCompactionTestCase extends QueryTest with BeforeAndA val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, carbonTable.getTablePath) - val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId) + val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId) val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) val dataFiles = carbonFile.listFiles(new CarbonFileFilter() { override def accept(file: CarbonFile): Boolean = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala index 669d6e7..eb091f3 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/standardpartition/StandardPartitionTableLoadingTestCase.scala @@ -70,7 +70,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, carbonTable.getTablePath) - val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId) + val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId) val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) val dataFiles = carbonFile.listFiles(new CarbonFileFilter() { override def accept(file: CarbonFile): Boolean = { @@ -335,7 +335,7 @@ class StandardPartitionTableLoadingTestCase extends QueryTest with BeforeAndAfte val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default_mergeindexpartitionthree") val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, carbonTable.getTablePath) - val segmentDir = tablePath.getCarbonDataDirectoryPath("0", "0") + val segmentDir = tablePath.getCarbonDataDirectoryPath("0") val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) val files = carbonFile.listFiles(new CarbonFileFilter { override def accept(file: CarbonFile): Boolean = CarbonTablePath.isCarbonIndexFile(file.getName) http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index 8be70a9..a5c1313 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -56,7 +56,6 @@ object DataLoadProcessBuilderOnSpark { .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount)) } - model.setPartitionId("0") val sc = sparkSession.sparkContext val modelBroadcast = sc.broadcast(model) val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala index 21de003..834c1a6 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala @@ -194,7 +194,7 @@ object DataLoadProcessorStepOnSpark { dataWriter = new DataWriterProcessorStepImpl(conf) - val dataHandlerModel = dataWriter.getDataHandlerModel(0) + val dataHandlerModel = dataWriter.getDataHandlerModel var dataHandler: CarbonFactHandler = null var rowsNotExist = true while (rows.hasNext) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala index 76c99f2..9de8dc9 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala @@ -18,19 +18,21 @@ package org.apache.carbondata.spark.rdd import scala.collection.JavaConverters._ +import scala.util.Random -import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.{Partition, SparkEnv, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.command.AlterPartitionModel import org.apache.spark.util.PartitionUtils import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.partition.spliter.RowResultProcessor import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} import org.apache.carbondata.spark.AlterPartitionResult -import org.apache.carbondata.spark.util.CommonUtil +import org.apache.carbondata.spark.util.{CommonUtil, Util} class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel, result: AlterPartitionResult[K, V], @@ -39,76 +41,96 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel, identifier: AbsoluteTableIdentifier, prev: RDD[Array[AnyRef]]) extends RDD[(K, V)](prev) { - var storeLocation: String = null - val carbonLoadModel = alterPartitionModel.carbonLoadModel - val segmentId = alterPartitionModel.segmentId - val oldPartitionIds = alterPartitionModel.oldPartitionIds - val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - val databaseName = carbonTable.getDatabaseName - val factTableName = carbonTable.getTableName - val partitionInfo = carbonTable.getPartitionInfo(factTableName) + var storeLocation: String = null + val carbonLoadModel = alterPartitionModel.carbonLoadModel + val segmentId = alterPartitionModel.segmentId + val oldPartitionIds = alterPartitionModel.oldPartitionIds + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val databaseName = carbonTable.getDatabaseName + val factTableName = carbonTable.getTableName + val partitionInfo = carbonTable.getPartitionInfo(factTableName) - override protected def getPartitions: Array[Partition] = { - val sc = alterPartitionModel.sqlContext.sparkContext - sc.setLocalProperty("spark.scheduler.pool", "DDL") - sc.setLocalProperty("spark.job.interruptOnCancel", "true") - firstParent[Array[AnyRef]].partitions - } - - override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = { - val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava - val iter = new Iterator[(K, V)] { - val partitionId = partitionInfo.getPartitionId(split.index) - carbonLoadModel.setTaskNo(String.valueOf(partitionId)) - carbonLoadModel.setSegmentId(segmentId) - carbonLoadModel.setPartitionId("0") - CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, false, true) + override protected def getPartitions: Array[Partition] = { + val sc = alterPartitionModel.sqlContext.sparkContext + sc.setLocalProperty("spark.scheduler.pool", "DDL") + sc.setLocalProperty("spark.job.interruptOnCancel", "true") + firstParent[Array[AnyRef]].partitions + } - val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName, - factTableName, - carbonLoadModel.getTaskNo, - "0", - segmentId, - false, - true - ) + override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava + val iter = new Iterator[(K, V)] { + val partitionId = partitionInfo.getPartitionId(split.index) + carbonLoadModel.setTaskNo(String.valueOf(partitionId)) + carbonLoadModel.setSegmentId(segmentId) + CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, false, true) + val tempLocationKey = CarbonDataProcessorUtil + .getTempStoreLocationKey(carbonLoadModel.getDatabaseName, + carbonLoadModel.getTableName, + segmentId, + carbonLoadModel.getTaskNo, + false, + true) + // this property is used to determine whether temp location for carbon is inside + // container temp dir or is yarn application directory. + val carbonUseLocalDir = CarbonProperties.getInstance() + .getProperty("carbon.use.local.dir", "false") - val loadStatus = if (rows.isEmpty) { - LOGGER.info("After repartition this split, NO target rows to write back.") - true - } else { - val segmentProperties = PartitionUtils.getSegmentProperties(identifier, - segmentId, partitionIds.toList, oldPartitionIds, partitionInfo, carbonTable) - val processor = new RowResultProcessor( - carbonTable, - carbonLoadModel, - segmentProperties, - tempStoreLoc, - bucketId) - try { - processor.execute(rows) - } catch { - case e: Exception => - sys.error(s"Exception when executing Row result processor ${e.getMessage}") - } finally { - TableProcessingOperations - .deleteLocalDataLoadFolderLocation(carbonLoadModel, false, true) - } - } + if (carbonUseLocalDir.equalsIgnoreCase("true")) { - val loadResult = segmentId - var finished = false + val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf) + if (null != storeLocations && storeLocations.nonEmpty) { + storeLocation = storeLocations(Random.nextInt(storeLocations.length)) + } + if (storeLocation == null) { + storeLocation = System.getProperty("java.io.tmpdir") + } + } else { + storeLocation = System.getProperty("java.io.tmpdir") + } + storeLocation = storeLocation + '/' + System.nanoTime() + '/' + split.index + CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation) + LOGGER.info(s"Temp storeLocation taken is $storeLocation") - override def hasNext: Boolean = { - !finished - } + val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation( + databaseName, factTableName, carbonLoadModel.getTaskNo, segmentId, false, true) - override def next(): (K, V) = { - finished = true - result.getKey(loadResult, loadStatus) - } + val loadStatus = if (rows.isEmpty) { + LOGGER.info("After repartition this split, NO target rows to write back.") + true + } else { + val segmentProperties = PartitionUtils.getSegmentProperties(identifier, + segmentId, partitionIds.toList, oldPartitionIds, partitionInfo, carbonTable) + val processor = new RowResultProcessor( + carbonTable, + carbonLoadModel, + segmentProperties, + tempStoreLoc, + bucketId) + try { + processor.execute(rows) + } catch { + case e: Exception => + sys.error(s"Exception when executing Row result processor ${ e.getMessage }") + } finally { + TableProcessingOperations + .deleteLocalDataLoadFolderLocation(carbonLoadModel, false, true) } - iter + } + + val loadResult = segmentId + var finished = false + + override def hasNext: Boolean = { + !finished + } + + override def next(): (K, V) = { + finished = true + result.getKey(loadResult, loadStatus) + } } + iter + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index 0859f2e..a517bf4 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -179,16 +179,9 @@ class CarbonMergerRDD[K, V]( } } - val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName, - factTableName, - carbonLoadModel.getTaskNo, - "0", - mergeNumber, - true, - false - ) + val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation( + databaseName, factTableName, carbonLoadModel.getTaskNo, mergeNumber, true, false) - carbonLoadModel.setPartitionId("0") var processor: AbstractResultProcessor = null if (restructuredBlockExists) { LOGGER.info("CompactionResultSortProcessor flow is selected") http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 72d0484..1fa1689 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -43,6 +43,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus} import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo} +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses, TableProcessingOperations} import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator} import org.apache.carbondata.processing.loading.exception.NoRetryException @@ -129,7 +130,8 @@ class SparkPartitionLoader(model: CarbonLoadModel, System.setProperty("carbon.properties.filepath", System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties") } - CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId) + CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo( + CarbonTablePath.DEPRECATED_PATITION_ID) CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true") CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1") CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true") @@ -219,14 +221,13 @@ class NewCarbonDataLoadRDD[K, V]( override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val iter = new Iterator[(K, V)] { - var partitionID = "0" val loadMetadataDetails = new LoadMetadataDetails() val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") var model: CarbonLoadModel = _ val uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index try { - loadMetadataDetails.setPartitionCount(partitionID) + loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID) loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) val preFetch = CarbonProperties.getInstance().getProperty(CarbonCommonConstants @@ -264,7 +265,7 @@ class NewCarbonDataLoadRDD[K, V]( // So print the data load statistics only in case of non failure case if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) { CarbonTimeStatisticsFactory.getLoadStatisticsInstance - .printStatisticsInfo(model.getPartitionId) + .printStatisticsInfo(CarbonTablePath.DEPRECATED_PATITION_ID) } } @@ -287,8 +288,8 @@ class NewCarbonDataLoadRDD[K, V]( val fileList: java.util.List[String] = new java.util.ArrayList[String]( CarbonCommonConstants.CONSTANT_SIZE_TEN) CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, fileList, ",") - model = carbonLoadModel.getCopyWithPartition(partitionID, fileList, - carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter) + model = carbonLoadModel.getCopyWithPartition( + carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter) StandardLogService.setThreadName(StandardLogService .getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName) , ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId + "") @@ -351,7 +352,6 @@ class NewDataFrameLoaderRDD[K, V]( val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val iter = new Iterator[(K, V)] { - val partitionID = "0" val loadMetadataDetails = new LoadMetadataDetails() val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") val model: CarbonLoadModel = carbonLoadModel @@ -359,9 +359,8 @@ class NewDataFrameLoaderRDD[K, V]( carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index try { - loadMetadataDetails.setPartitionCount(partitionID) + loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID) loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) - carbonLoadModel.setPartitionId(partitionID) carbonLoadModel.setTaskNo(String.valueOf(theSplit.index)) carbonLoadModel.setPreFetch(false) @@ -406,7 +405,7 @@ class NewDataFrameLoaderRDD[K, V]( // So print the data load statistics only in case of non failure case if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) { CarbonTimeStatisticsFactory.getLoadStatisticsInstance - .printStatisticsInfo(model.getPartitionId) + .printStatisticsInfo(CarbonTablePath.DEPRECATED_PATITION_ID) } } var finished = false @@ -542,7 +541,6 @@ class PartitionTableDataLoaderRDD[K, V]( override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val iter = new Iterator[(K, V)] { - val partitionID = "0" val loadMetadataDetails = new LoadMetadataDetails() val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") val model: CarbonLoadModel = carbonLoadModel @@ -552,9 +550,8 @@ class PartitionTableDataLoaderRDD[K, V]( carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index try { - loadMetadataDetails.setPartitionCount(partitionID) + loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID) loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) - carbonLoadModel.setPartitionId(partitionID) carbonLoadModel.setTaskNo(String.valueOf(partitionInfo.getPartitionId(theSplit.index))) carbonLoadModel.setPreFetch(false) val recordReaders = Array[CarbonIterator[Array[AnyRef]]] { @@ -590,7 +587,7 @@ class PartitionTableDataLoaderRDD[K, V]( // So print the data load statistics only in case of non failure case if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) { CarbonTimeStatisticsFactory.getLoadStatisticsInstance - .printStatisticsInfo(model.getPartitionId) + .printStatisticsInfo(CarbonTablePath.DEPRECATED_PATITION_ID) } } var finished = false http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala index 0498b25..3c871db 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala @@ -190,8 +190,9 @@ object PartitionUtils { val batchNo = CarbonTablePath.DataFileUtil.getBatchNoFromTaskNo(taskNo) val taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskNo) val bucketNumber = CarbonTablePath.DataFileUtil.getBucketNo(path) - val indexFilePath = carbonTablePath.getCarbonIndexFilePath(String.valueOf(taskId), "0", - segmentId, batchNo, String.valueOf(bucketNumber), timestamp, version) + val indexFilePath = carbonTablePath.getCarbonIndexFilePath( + String.valueOf(taskId), segmentId, batchNo, String.valueOf(bucketNumber), + timestamp, version) // indexFilePath could be duplicated when multiple data file related to one index file if (indexFilePath != null && !pathList.contains(indexFilePath)) { pathList.add(indexFilePath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 8ed7623..3980c11 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -615,7 +615,6 @@ object CarbonDataRDDFactory { override def getPartition(key: Any): Int = { val segId = key.asInstanceOf[String] - // partitionId segmentIdIndex(segId) * parallelism + Random.nextInt(parallelism) } } @@ -649,7 +648,6 @@ object CarbonDataRDDFactory { val rddResult = new updateResultImpl() val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val resultIter = new Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] { - val partitionID = "0" val loadMetadataDetails = new LoadMetadataDetails val executionErrors = ExecutionErrors(FailureCauses.NONE, "") var uniqueLoadStatusId = "" @@ -660,10 +658,9 @@ object CarbonDataRDDFactory { CarbonCommonConstants.UNDERSCORE + (index + "_0") - loadMetadataDetails.setPartitionCount(partitionID) + loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID) loadMetadataDetails.setLoadName(segId) loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_FAILURE) - carbonLoadModel.setPartitionId(partitionID) carbonLoadModel.setSegmentId(segId) carbonLoadModel.setTaskNo(String.valueOf(index)) carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp) http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala index 17749c8..0c956e5 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala @@ -105,7 +105,6 @@ with Serializable { model, conf ) - model.setPartitionId("0") model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean) model.setDictionaryServerHost(options.getOrElse("dicthost", null)) model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt) http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala index b5325ef..aadee81 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala @@ -858,7 +858,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[CarbonFile] = { val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, carbonTable.getTablePath) - val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId) + val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId) val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) val dataFiles = carbonFile.listFiles(new CarbonFileFilter() { override def accept(file: CarbonFile): Boolean = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala index 102df39..9da7244 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala @@ -173,6 +173,7 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll { case s: ShuffleExchange => shuffleExists = true } assert(!shuffleExists, "shuffle should not exist on bucket tables") + sql("DROP TABLE bucketed_parquet_table") } test("test create table with bucket join of carbon table and non bucket parquet table") { @@ -197,6 +198,7 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll { case s: ShuffleExchange => shuffleExists = true } assert(shuffleExists, "shuffle should exist on non bucket tables") + sql("DROP TABLE parquet_table") } test("test scalar subquery with equal") { http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java index 7b1ab9d..e291f41 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java @@ -40,8 +40,6 @@ public class CarbonDataLoadConfiguration { private String[] header; - private String partitionId; - private String segmentId; private String taskNo; @@ -189,14 +187,6 @@ public class CarbonDataLoadConfiguration { this.tableIdentifier = tableIdentifier; } - public String getPartitionId() { - return partitionId; - } - - public void setPartitionId(String partitionId) { - this.partitionId = partitionId; - } - public String getSegmentId() { return segmentId; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java index f7eff81..cf045a4 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java @@ -152,7 +152,6 @@ public final class DataLoadProcessBuilder { configuration.setTableIdentifier(identifier); configuration.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime()); configuration.setHeader(loadModel.getCsvHeaderColumns()); - configuration.setPartitionId(loadModel.getPartitionId()); configuration.setSegmentId(loadModel.getSegmentId()); configuration.setTaskNo(loadModel.getTaskNo()); configuration.setDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS, http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java index e2be79c..a8db6c9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java @@ -63,8 +63,7 @@ public class TableProcessingOperations { //delete folder which metadata no exist in tablestatus for (int i = 0; i < carbonTable.getPartitionCount(); i++) { - final String partitionCount = i + ""; - String partitionPath = carbonTablePath.getPartitionDir(partitionCount); + String partitionPath = carbonTablePath.getPartitionDir(); FileFactory.FileType fileType = FileFactory.getFileType(partitionPath); if (FileFactory.isFileExist(partitionPath, fileType)) { CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java index d41455f..fef2da6 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/model/CarbonLoadModel.java @@ -39,8 +39,6 @@ public class CarbonLoadModel implements Serializable { private String colDictFilePath; - private String partitionId; - private CarbonDataLoadSchema carbonDataLoadSchema; private boolean aggLoadRequest; @@ -356,55 +354,6 @@ public class CarbonLoadModel implements Serializable { } /** - * get copy with partition - * - * @param uniqueId - * @return - */ - public CarbonLoadModel getCopyWithPartition(String uniqueId) { - CarbonLoadModel copy = new CarbonLoadModel(); - copy.tableName = tableName; - copy.factFilePath = factFilePath + '/' + uniqueId; - copy.databaseName = databaseName; - copy.partitionId = uniqueId; - copy.aggLoadRequest = aggLoadRequest; - copy.loadMetadataDetails = loadMetadataDetails; - copy.isRetentionRequest = isRetentionRequest; - copy.complexDelimiterLevel1 = complexDelimiterLevel1; - copy.complexDelimiterLevel2 = complexDelimiterLevel2; - copy.carbonDataLoadSchema = carbonDataLoadSchema; - copy.blocksID = blocksID; - copy.taskNo = taskNo; - copy.factTimeStamp = factTimeStamp; - copy.segmentId = segmentId; - copy.serializationNullFormat = serializationNullFormat; - copy.badRecordsLoggerEnable = badRecordsLoggerEnable; - copy.badRecordsAction = badRecordsAction; - copy.escapeChar = escapeChar; - copy.quoteChar = quoteChar; - copy.commentChar = commentChar; - copy.timestampformat = timestampformat; - copy.dateFormat = dateFormat; - copy.defaultTimestampFormat = defaultTimestampFormat; - copy.maxColumns = maxColumns; - copy.tablePath = tablePath; - copy.useOnePass = useOnePass; - copy.dictionaryServerHost = dictionaryServerHost; - copy.dictionaryServerPort = dictionaryServerPort; - copy.dictionaryServerSecretKey = dictionaryServerSecretKey; - copy.dictionaryEncryptServerSecure = dictionaryEncryptServerSecure; - copy.dictionaryServiceProvider = dictionaryServiceProvider; - copy.preFetch = preFetch; - copy.isEmptyDataBadRecord = isEmptyDataBadRecord; - copy.skipEmptyLine = skipEmptyLine; - copy.sortScope = sortScope; - copy.batchSortSizeInMb = batchSortSizeInMb; - copy.badRecordsLocation = badRecordsLocation; - copy.isAggLoadRequest = isAggLoadRequest; - return copy; - } - - /** * Get copy with taskNo. * Broadcast value is shared in process, so we need to copy it to make sure the value in each * task independently. @@ -416,7 +365,6 @@ public class CarbonLoadModel implements Serializable { copy.tableName = tableName; copy.factFilePath = factFilePath; copy.databaseName = databaseName; - copy.partitionId = partitionId; copy.aggLoadRequest = aggLoadRequest; copy.loadMetadataDetails = loadMetadataDetails; copy.isRetentionRequest = isRetentionRequest; @@ -460,19 +408,15 @@ public class CarbonLoadModel implements Serializable { /** * get CarbonLoadModel with partition * - * @param uniqueId - * @param filesForPartition * @param header * @param delimiter * @return */ - public CarbonLoadModel getCopyWithPartition(String uniqueId, List<String> filesForPartition, - String header, String delimiter) { + public CarbonLoadModel getCopyWithPartition(String header, String delimiter) { CarbonLoadModel copyObj = new CarbonLoadModel(); copyObj.tableName = tableName; copyObj.factFilePath = null; copyObj.databaseName = databaseName; - copyObj.partitionId = uniqueId; copyObj.aggLoadRequest = aggLoadRequest; copyObj.loadMetadataDetails = loadMetadataDetails; copyObj.isRetentionRequest = isRetentionRequest; @@ -514,20 +458,6 @@ public class CarbonLoadModel implements Serializable { } /** - * @return the partitionId - */ - public String getPartitionId() { - return partitionId; - } - - /** - * @param partitionId the partitionId to set - */ - public void setPartitionId(String partitionId) { - this.partitionId = partitionId; - } - - /** * @param tablePath The tablePath to set. */ public void setTablePath(String tablePath) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java index 6432d38..fcc88b5 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java @@ -73,8 +73,8 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter { String[] storeLocations = CarbonDataProcessorUtil.getLocalDataFolderLocation( sortParameters.getDatabaseName(), sortParameters.getTableName(), - String.valueOf(sortParameters.getTaskNo()), sortParameters.getPartitionID(), - sortParameters.getSegmentId() + "", false, false); + String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(), + false, false); // Set the data file location String[] dataFolderLocations = CarbonDataProcessorUtil.arrayAppend(storeLocations, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java index c7030dd..b7452a7 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java @@ -133,10 +133,10 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte } private SingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) { - String[] storeLocation = CarbonDataProcessorUtil - .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(), - String.valueOf(sortParameters.getTaskNo()), bucketId, - sortParameters.getSegmentId() + "", false, false); + String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation( + sortParameters.getDatabaseName(), sortParameters.getTableName(), + String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(), + false, false); // Set the data file location String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); @@ -181,10 +181,9 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte } private void setTempLocation(SortParameters parameters) { - String[] carbonDataDirectoryPath = CarbonDataProcessorUtil - .getLocalDataFolderLocation(parameters.getDatabaseName(), - parameters.getTableName(), parameters.getTaskNo(), - parameters.getPartitionID(), parameters.getSegmentId(), false, false); + String[] carbonDataDirectoryPath = CarbonDataProcessorUtil.getLocalDataFolderLocation( + parameters.getDatabaseName(), parameters.getTableName(), parameters.getTaskNo(), + parameters.getSegmentId(), false, false); String[] tmpLocs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); parameters.setTempFileLocation(tmpLocs); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java index c5579d9..ed3a55d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java @@ -219,10 +219,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter } private void setTempLocation(SortParameters parameters) { - String[] carbonDataDirectoryPath = CarbonDataProcessorUtil - .getLocalDataFolderLocation(parameters.getDatabaseName(), - parameters.getTableName(), parameters.getTaskNo(), batchCount + "", - parameters.getSegmentId(), false, false); + String[] carbonDataDirectoryPath = CarbonDataProcessorUtil.getLocalDataFolderLocation( + parameters.getDatabaseName(), parameters.getTableName(), parameters.getTaskNo(), + parameters.getSegmentId(), false, false); String[] tempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); parameters.setTempFileLocation(tempDirs); http://git-wip-us.apache.org/repos/asf/carbondata/blob/d3cbb026/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java index 3c48e4d..f605b22 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java @@ -119,18 +119,17 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg Iterator<CarbonRowBatch>[] batchIterator = new Iterator[bucketingInfo.getNumberOfBuckets()]; for (int i = 0; i < sortDataRows.length; i++) { - batchIterator[i] = - new MergedDataIterator(String.valueOf(i), batchSize, intermediateFileMergers[i]); + batchIterator[i] = new MergedDataIterator(batchSize, intermediateFileMergers[i]); } return batchIterator; } - private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) { - String[] storeLocation = CarbonDataProcessorUtil - .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(), - String.valueOf(sortParameters.getTaskNo()), bucketId, - sortParameters.getSegmentId() + "", false, false); + private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger() { + String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation( + sortParameters.getDatabaseName(), sortParameters.getTableName(), + String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(), + false, false); // Set the data file location String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); @@ -173,7 +172,7 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg private void setTempLocation(SortParameters parameters) { String[] carbonDataDirectoryPath = CarbonDataProcessorUtil .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(), - parameters.getTaskNo(), parameters.getPartitionID(), parameters.getSegmentId(), + parameters.getTaskNo(), parameters.getSegmentId(), false, false); String[] tmpLoc = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); @@ -224,7 +223,6 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> { - private String partitionId; private int batchSize; @@ -232,9 +230,8 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg private UnsafeIntermediateMerger intermediateMerger; - public MergedDataIterator(String partitionId, int batchSize, + public MergedDataIterator(int batchSize, UnsafeIntermediateMerger intermediateMerger) { - this.partitionId = partitionId; this.batchSize = batchSize; this.intermediateMerger = intermediateMerger; this.firstRow = true; @@ -245,7 +242,7 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg @Override public boolean hasNext() { if (firstRow) { firstRow = false; - finalMerger = getFinalMerger(partitionId); + finalMerger = getFinalMerger(); List<UnsafeCarbonRowPage> rowPages = intermediateMerger.getRowPages(); finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]), intermediateMerger.getMergedPages());