[CARBONDATA-1992] Remove partitionId in CarbonTablePath In CarbonTablePath, there is a deprecated partition id which is always 0, it should be removed to avoid confusion.
This closes #1765 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/bf3602fc Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/bf3602fc Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/bf3602fc Branch: refs/heads/carbonstore-rebase4 Commit: bf3602fc2814b596f19c8493e49d75d94c68dff5 Parents: 8d3c774 Author: Jacky Li <jacky.li...@qq.com> Authored: Sat Jan 6 20:28:44 2018 +0800 Committer: Jacky Li <jacky.li...@qq.com> Committed: Mon Feb 26 23:55:27 2018 +0800 ---------------------------------------------------------------------- .../core/metadata/PartitionMapFileStore.java | 0 .../core/mutate/CarbonUpdateUtil.java | 8 +- .../core/statusmanager/LoadMetadataDetails.java | 1 + .../SegmentUpdateStatusManager.java | 6 +- .../apache/carbondata/core/util/CarbonUtil.java | 6 +- .../core/util/path/CarbonTablePath.java | 55 ++++--- .../CarbonFormatDirectoryStructureTest.java | 4 +- .../hadoop/api/CarbonTableInputFormat.java | 2 +- .../streaming/CarbonStreamRecordWriter.java | 2 +- .../hadoop/test/util/StoreCreator.java | 1 - .../presto/util/CarbonDataStoreCreator.scala | 1 - .../dataload/TestLoadDataGeneral.scala | 2 +- .../InsertIntoCarbonTableTestCase.scala | 4 +- .../dataload/TestBatchSortDataLoad.scala | 3 +- .../dataload/TestDataLoadWithFileName.scala | 2 +- .../dataload/TestGlobalSortDataLoad.scala | 4 +- .../testsuite/datamap/TestDataMapCommand.scala | 34 ++-- .../TestDataLoadingForPartitionTable.scala | 3 +- .../load/DataLoadProcessBuilderOnSpark.scala | 1 - .../load/DataLoadProcessorStepOnSpark.scala | 2 +- .../spark/rdd/AlterTableLoadPartitionRDD.scala | 154 +++++++++++-------- .../carbondata/spark/rdd/CarbonMergerRDD.scala | 11 +- .../spark/rdd/NewCarbonDataLoadRDD.scala | 25 ++- .../org/apache/spark/util/PartitionUtils.scala | 5 +- .../spark/rdd/CarbonDataRDDFactory.scala | 5 +- .../datasources/CarbonFileFormat.scala | 1 - .../partition/TestAlterPartitionTable.scala | 2 +- .../bucketing/TableBucketingTestCase.scala | 2 + .../loading/CarbonDataLoadConfiguration.java | 10 -- .../loading/DataLoadProcessBuilder.java | 1 - .../loading/TableProcessingOperations.java | 3 +- .../sort/impl/ParallelReadMergeSorterImpl.java | 4 +- ...arallelReadMergeSorterWithBucketingImpl.java | 15 +- .../UnsafeBatchParallelReadMergeSorterImpl.java | 7 +- ...arallelReadMergeSorterWithBucketingImpl.java | 21 ++- .../CarbonRowDataWriterProcessorStepImpl.java | 33 ++-- .../steps/DataWriterBatchProcessorStepImpl.java | 25 +-- .../steps/DataWriterProcessorStepImpl.java | 22 +-- .../processing/merger/CarbonDataMergerUtil.java | 6 +- .../merger/CompactionResultSortProcessor.java | 4 +- .../sort/sortdata/SortParameters.java | 16 +- .../store/CarbonFactDataHandlerModel.java | 3 +- .../util/CarbonDataProcessorUtil.java | 9 +- .../processing/util/CarbonLoaderUtil.java | 12 +- .../processing/util/DeleteLoadFolders.java | 7 +- .../carbondata/processing/StoreCreator.java | 1 - .../carbondata/streaming/StreamHandoffRDD.scala | 1 - .../streaming/StreamSinkFactory.scala | 2 +- .../streaming/CarbonAppendableStreamSink.scala | 8 +- 49 files changed, 274 insertions(+), 282 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java b/core/src/main/java/org/apache/carbondata/core/metadata/PartitionMapFileStore.java new file mode 100644 index 0000000..e69de29 http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java index de98fa8..18eae11 100644 --- a/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java @@ -319,9 +319,7 @@ public class CarbonUpdateUtil { CarbonTablePath carbonTablePath = CarbonStorePath .getCarbonTablePath(absoluteTableIdentifier.getTablePath(), absoluteTableIdentifier.getCarbonTableIdentifier()); - // as of now considering only partition 0. - String partitionId = "0"; - String partitionDir = carbonTablePath.getPartitionDir(partitionId); + String partitionDir = carbonTablePath.getPartitionDir(); CarbonFile file = FileFactory.getCarbonFile(partitionDir, FileFactory.getFileType(partitionDir)); if (!file.exists()) { @@ -402,7 +400,7 @@ public class CarbonUpdateUtil { } public static long getLatestTaskIdForSegment(String segmentId, CarbonTablePath tablePath) { - String segmentDirPath = tablePath.getCarbonDataDirectoryPath("0", segmentId); + String segmentDirPath = tablePath.getCarbonDataDirectoryPath(segmentId); // scan all the carbondata files and get the latest task ID. CarbonFile segment = @@ -467,7 +465,7 @@ public class CarbonUpdateUtil { || segment.getSegmentStatus() == SegmentStatus.LOAD_PARTIAL_SUCCESS) { // take the list of files from this segment. - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segment.getLoadName()); + String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segment.getLoadName()); CarbonFile segDir = FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); CarbonFile[] allSegmentFiles = segDir.listFiles(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java index a0fa67d..b6a9e36 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/LoadMetadataDetails.java @@ -132,6 +132,7 @@ public class LoadMetadataDetails implements Serializable { return partitionCount; } + @Deprecated public void setPartitionCount(String partitionCount) { this.partitionCount = partitionCount; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java index 71b6ba8..2edb379 100644 --- a/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java +++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentUpdateStatusManager.java @@ -202,7 +202,7 @@ public class SegmentUpdateStatusManager { new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); String endTimeStamp = ""; String startTimeStamp = ""; - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId); + String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId); CarbonFile segDir = FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); for (LoadMetadataDetails eachSeg : segmentDetails) { @@ -437,7 +437,7 @@ public class SegmentUpdateStatusManager { .getCarbonTablePath(absoluteTableIdentifier.getTablePath(), absoluteTableIdentifier.getCarbonTableIdentifier()); - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId.getSegmentNo()); + String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId.getSegmentNo()); CarbonFile segDir = FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); @@ -880,7 +880,7 @@ public class SegmentUpdateStatusManager { // filter out the fact files. - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId); + String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId); CarbonFile segDir = FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java index eb0a9d7..52305bd 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java @@ -1105,7 +1105,7 @@ public final class CarbonUtil { // geting the index file path //TODO need to pass proper partition number when partiton will be supported String carbonIndexFilePath = carbonTablePath - .getCarbonIndexFilePath(taskId, "0", tableBlockInfoList.get(0).getSegmentId(), + .getCarbonIndexFilePath(taskId, tableBlockInfoList.get(0).getSegmentId(), bucketNumber, CarbonTablePath.DataFileUtil .getTimeStampFromFileName(tableBlockInfoList.get(0).getFilePath()), tableBlockInfoList.get(0).getVersion()); @@ -1348,7 +1348,7 @@ public final class CarbonUtil { // geting the index file path //TODO need to pass proper partition number when partiton will be supported String carbonIndexFilePath = carbonTablePath - .getCarbonIndexFilePath(taskId, "0", tableBlockInfoList.get(0).getSegmentId(), + .getCarbonIndexFilePath(taskId, tableBlockInfoList.get(0).getSegmentId(), bucketNumber, CarbonTablePath.DataFileUtil .getTimeStampFromFileName(tableBlockInfoList.get(0).getFilePath()), tableBlockInfoList.get(0).getVersion()); @@ -2305,7 +2305,7 @@ public final class CarbonUtil { long carbonDataSize = 0L; long carbonIndexSize = 0L; HashMap<String, Long> dataAndIndexSize = new HashMap<String, Long>(); - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId); + String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId); FileFactory.FileType fileType = FileFactory.getFileType(segmentPath); switch (fileType) { case HDFS: http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java index d70d9ef..293257b 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java +++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java @@ -267,15 +267,14 @@ public class CarbonTablePath extends Path { /** * Gets absolute path of data file * - * @param partitionId unique partition identifier * @param segmentId unique partition identifier * @param filePartNo data file part number * @param factUpdateTimeStamp unique identifier to identify an update * @return absolute path of data file stored in carbon data format */ - public String getCarbonDataFilePath(String partitionId, String segmentId, Integer filePartNo, - Long taskNo, int batchNo, int bucketNumber, String factUpdateTimeStamp) { - return getSegmentDir(partitionId, segmentId) + File.separator + getCarbonDataFileName( + public String getCarbonDataFilePath(String segmentId, Integer filePartNo, Long taskNo, + int batchNo, int bucketNumber, String factUpdateTimeStamp) { + return getSegmentDir(segmentId) + File.separator + getCarbonDataFileName( filePartNo, taskNo, bucketNumber, batchNo, factUpdateTimeStamp); } @@ -284,13 +283,12 @@ public class CarbonTablePath extends Path { * based on task id * * @param taskId task id of the file - * @param partitionId partition number * @param segmentId segment number * @return full qualified carbon index path */ - public String getCarbonIndexFilePath(final String taskId, final String partitionId, - final String segmentId, final String bucketNumber) { - String segmentDir = getSegmentDir(partitionId, segmentId); + public String getCarbonIndexFilePath(final String taskId, final String segmentId, + final String bucketNumber) { + String segmentDir = getSegmentDir(segmentId); CarbonFile carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)); @@ -306,9 +304,8 @@ public class CarbonTablePath extends Path { if (files.length > 0) { return files[0].getAbsolutePath(); } else { - throw new RuntimeException("Missing Carbon index file for partition[" - + partitionId + "] Segment[" + segmentId + "], taskId[" + taskId - + "]"); + throw new RuntimeException("Missing Carbon index file for Segment[" + segmentId + "], " + + "taskId[" + taskId + "]"); } } @@ -316,8 +313,6 @@ public class CarbonTablePath extends Path { * Below method will be used to get the carbon index file path * @param taskId * task id - * @param partitionId - * partition id * @param segmentId * segment id * @param bucketNumber @@ -326,28 +321,27 @@ public class CarbonTablePath extends Path { * timestamp * @return carbon index file path */ - public String getCarbonIndexFilePath(String taskId, String partitionId, String segmentId, - String bucketNumber, String timeStamp, ColumnarFormatVersion columnarFormatVersion) { + public String getCarbonIndexFilePath(String taskId, String segmentId, String bucketNumber, + String timeStamp, ColumnarFormatVersion columnarFormatVersion) { switch (columnarFormatVersion) { case V1: case V2: - return getCarbonIndexFilePath(taskId, partitionId, segmentId, bucketNumber); + return getCarbonIndexFilePath(taskId, segmentId, bucketNumber); default: - String segmentDir = getSegmentDir(partitionId, segmentId); + String segmentDir = getSegmentDir(segmentId); return segmentDir + File.separator + getCarbonIndexFileName(taskId, Integer.parseInt(bucketNumber), timeStamp); } } - public String getCarbonIndexFilePath(String taskId, String partitionId, String segmentId, - int batchNo, String bucketNumber, String timeStamp, - ColumnarFormatVersion columnarFormatVersion) { + public String getCarbonIndexFilePath(String taskId, String segmentId, int batchNo, + String bucketNumber, String timeStamp, ColumnarFormatVersion columnarFormatVersion) { switch (columnarFormatVersion) { case V1: case V2: - return getCarbonIndexFilePath(taskId, partitionId, segmentId, bucketNumber); + return getCarbonIndexFilePath(taskId, segmentId, bucketNumber); default: - String segmentDir = getSegmentDir(partitionId, segmentId); + String segmentDir = getSegmentDir(segmentId); return segmentDir + File.separator + getCarbonIndexFileName(Long.parseLong(taskId), Integer.parseInt(bucketNumber), batchNo, timeStamp); } @@ -364,12 +358,11 @@ public class CarbonTablePath extends Path { /** * Gets absolute path of data file * - * @param partitionId unique partition identifier * @param segmentId unique partition identifier * @return absolute path of data file stored in carbon data format */ - public String getCarbonDataDirectoryPath(String partitionId, String segmentId) { - return getSegmentDir(partitionId, segmentId); + public String getCarbonDataDirectoryPath(String segmentId) { + return getSegmentDir(segmentId); } /** @@ -407,12 +400,16 @@ public class CarbonTablePath extends Path { return segmentDir + File.separator + getCarbonStreamIndexFileName(); } - public String getSegmentDir(String partitionId, String segmentId) { - return getPartitionDir(partitionId) + File.separator + SEGMENT_PREFIX + segmentId; + public String getSegmentDir(String segmentId) { + return getPartitionDir() + File.separator + SEGMENT_PREFIX + segmentId; } - public String getPartitionDir(String partitionId) { - return getFactDir() + File.separator + PARTITION_PREFIX + partitionId; + // This partition is not used in any code logic, just keep backward compatibility + public static final String DEPRECATED_PATITION_ID = "0"; + + public String getPartitionDir() { + return getFactDir() + File.separator + PARTITION_PREFIX + + CarbonTablePath.DEPRECATED_PATITION_ID; } private String getMetaDataDir() { http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java index 5549806..a1ccab3 100644 --- a/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java +++ b/core/src/test/java/org/apache/carbondata/core/util/path/CarbonFormatDirectoryStructureTest.java @@ -53,8 +53,8 @@ public class CarbonFormatDirectoryStructureTest { .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.dictmeta")); assertTrue(carbonTablePath.getSortIndexFilePath("t1_c1").replace("\\", "/") .equals(CARBON_STORE + "/d1/t1/Metadata/t1_c1.sortindex")); - assertTrue(carbonTablePath.getCarbonDataFilePath("1", "2", 3, 4L, 0, 0, "999").replace("\\", "/") - .equals(CARBON_STORE + "/d1/t1/Fact/Part1/Segment_2/part-3-4_batchno0-0-999.carbondata")); + assertTrue(carbonTablePath.getCarbonDataFilePath("2", 3, 4L, 0, 0, "999").replace("\\", "/") + .equals(CARBON_STORE + "/d1/t1/Fact/Part0/Segment_2/part-3-4_batchno0-0-999.carbondata")); } /** http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java index 96b0b21..069e1f7 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableInputFormat.java @@ -499,7 +499,7 @@ public class CarbonTableInputFormat<T> extends FileInputFormat<Void, T> { long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job)); long maxSize = getMaxSplitSize(job); for (Segment segment : streamSegments) { - String segmentDir = tablePath.getSegmentDir("0", segment.getSegmentNo()); + String segmentDir = tablePath.getSegmentDir(segment.getSegmentNo()); FileFactory.FileType fileType = FileFactory.getFileType(segmentDir); if (FileFactory.isFileExist(segmentDir, fileType)) { String indexName = CarbonTablePath.getCarbonStreamIndexFileName(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java ---------------------------------------------------------------------- diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java index 364a6a6..3ef8afc 100644 --- a/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java +++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/streaming/CarbonStreamRecordWriter.java @@ -129,7 +129,7 @@ public class CarbonStreamRecordWriter extends RecordWriter<Void, Object> { CarbonTablePath tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier()); - segmentDir = tablePath.getSegmentDir("0", segmentId); + segmentDir = tablePath.getSegmentDir(segmentId); fileName = CarbonTablePath.getCarbonDataFileName(0, taskNo, 0, 0, "0"); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java ---------------------------------------------------------------------- diff --git a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java index fbf33d6..ac17c4e 100644 --- a/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java +++ b/hadoop/src/test/java/org/apache/carbondata/hadoop/test/util/StoreCreator.java @@ -154,7 +154,6 @@ public class StoreCreator { loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(",")); loadModel.setTaskNo("0"); loadModel.setSegmentId("0"); - loadModel.setPartitionId("0"); loadModel.setFactTimeStamp(System.currentTimeMillis()); loadModel.setMaxColumns("10"); return loadModel; http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala ---------------------------------------------------------------------- diff --git a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala index 7b5c311..a41e738 100644 --- a/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala +++ b/integration/presto/src/test/scala/org/apache/carbondata/presto/util/CarbonDataStoreCreator.scala @@ -135,7 +135,6 @@ object CarbonDataStoreCreator { loadModel.setCsvHeaderColumns(loadModel.getCsvHeader.split(",")) loadModel.setTaskNo("0") loadModel.setSegmentId("0") - loadModel.setPartitionId("0") loadModel.setFactTimeStamp(System.currentTimeMillis()) loadModel.setMaxColumns("15") executeGraph(loadModel, storePath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala index 09ca9e5..c84ae6b 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/dataload/TestLoadDataGeneral.scala @@ -49,7 +49,7 @@ class TestLoadDataGeneral extends QueryTest with BeforeAndAfterEach { tableName: String): Boolean = { val carbonTable = CarbonMetadata.getInstance().getCarbonTable(datbaseName, tableName) val partitionPath = CarbonStorePath - .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0") + .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir val fileType: FileFactory.FileType = FileFactory.getFileType(partitionPath) val carbonFile = FileFactory.getCarbonFile(partitionPath, fileType) val segments: ArrayBuffer[String] = ArrayBuffer() http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala index d59f0b5..5cc4156 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/allqueries/InsertIntoCarbonTableTestCase.scala @@ -232,7 +232,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { checkAnswer(sql("select count(*) from CarbonOverwrite"), sql("select count(*) from HiveOverwrite")) val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbonoverwrite") val partitionPath = CarbonStorePath - .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0") + .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir val folder = new File(partitionPath) assert(folder.isDirectory) assert(folder.list().length == 1) @@ -255,7 +255,7 @@ class InsertIntoCarbonTableTestCase extends QueryTest with BeforeAndAfterAll { checkAnswer(sql("select count(*) from TCarbonSourceOverwrite"), sql("select count(*) from HiveOverwrite")) val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "tcarbonsourceoverwrite") val partitionPath = CarbonStorePath - .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir("0") + .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier).getPartitionDir val folder = new File(partitionPath) assert(folder.isDirectory) http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala index 4af9d54..42ac4df 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestBatchSortDataLoad.scala @@ -193,9 +193,8 @@ class TestBatchSortDataLoad extends QueryTest with BeforeAndAfterAll { CarbonCommonConstants.DATABASE_DEFAULT_NAME, tableName ) - val absoluteTableIdentifier = carbonTable.getAbsoluteTableIdentifier val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", segmentNo) + val segmentDir = carbonTablePath.getCarbonDataDirectoryPath(segmentNo) new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size() } http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala index dae0962..db0a62c 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestDataLoadWithFileName.scala @@ -49,7 +49,7 @@ class TestDataLoadWithFileName extends QueryTest with BeforeAndAfterAll { val indexReader = new CarbonIndexFileReader() val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "test_table_v3") val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", "0") + val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0") val carbonIndexPaths = new File(segmentDir) .listFiles(new FilenameFilter { override def accept(dir: File, name: String): Boolean = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala index 0d9e0fd..479db50 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/dataload/TestGlobalSortDataLoad.scala @@ -272,7 +272,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo sql(s"LOAD DATA LOCAL INPATH '$inputPath' INTO TABLE carbon_globalsort") val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", "carbon_globalsort") val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val segmentDir = carbonTablePath.getSegmentDir("0", "0") + val segmentDir = carbonTablePath.getSegmentDir("0") assertResult(Math.max(7, defaultParallelism) + 1)(new File(segmentDir).listFiles().length) } @@ -379,7 +379,7 @@ class TestGlobalSortDataLoad extends QueryTest with BeforeAndAfterEach with Befo private def getIndexFileCount(tableName: String, segmentNo: String = "0"): Int = { val carbonTable = CarbonMetadata.getInstance().getCarbonTable("default", tableName) val carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val segmentDir = carbonTablePath.getCarbonDataDirectoryPath("0", segmentNo) + val segmentDir = carbonTablePath.getCarbonDataDirectoryPath(segmentNo) new SegmentIndexFileStore().getIndexFilesFromSegment(segmentDir).size() } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala index f403b3e..d60b7db 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/datamap/TestDataMapCommand.scala @@ -220,19 +220,27 @@ class TestDataMapCommand extends QueryTest with BeforeAndAfterAll { } test("test if preaggregate load is successfull for hivemetastore") { - CarbonProperties.getInstance().addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true") - sql("DROP TABLE IF EXISTS maintable") - sql( - """ - | CREATE TABLE maintable(id int, name string, city string, age int) - | STORED BY 'org.apache.carbondata.format' - """.stripMargin) - sql( - s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id""" - .stripMargin) - sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") - checkAnswer(sql(s"select * from maintable_preagg_sum"), - Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55))) + try { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, "true") + sql("DROP TABLE IF EXISTS maintable") + sql( + """ + | CREATE TABLE maintable(id int, name string, city string, age int) + | STORED BY 'org.apache.carbondata.format' + """.stripMargin) + sql( + s"""create datamap preagg_sum on table maintable using 'preaggregate' as select id,sum(age) from maintable group by id""" + + .stripMargin) + sql(s"LOAD DATA LOCAL INPATH '$testData' into table maintable") + checkAnswer(sql(s"select * from maintable_preagg_sum"), + Seq(Row(1, 31), Row(2, 27), Row(3, 70), Row(4, 55))) + } finally { + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE, + CarbonCommonConstants.ENABLE_HIVE_SCHEMA_META_STORE_DEFAULT) + } } test("test preaggregate load for decimal column for hivemetastore") { http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala index ed151bd..0a21aed 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestDataLoadingForPartitionTable.scala @@ -63,7 +63,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll val carbonTable = CarbonMetadata.getInstance().getCarbonTable(tableUniqueName) val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, carbonTable.getTablePath) - val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId) + val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId) val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) val dataFiles = carbonFile.listFiles(new CarbonFileFilter() { override def accept(file: CarbonFile): Boolean = { @@ -87,6 +87,7 @@ class TestDataLoadingForPartitionTable extends QueryTest with BeforeAndAfterAll | projectcode int, projectjoindate Timestamp, projectenddate Timestamp,attendance int, | utilization int,salary int) | PARTITIONED BY (empno int) + | | STORED BY 'org.apache.carbondata.format' | TBLPROPERTIES('PARTITION_TYPE'='HASH','NUM_PARTITIONS'='3') """.stripMargin) http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala index 30e4fc9..e1bd84b 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessBuilderOnSpark.scala @@ -56,7 +56,6 @@ object DataLoadProcessBuilderOnSpark { .map(DataLoadProcessorStepOnSpark.toStringArrayRow(_, columnCount)) } - model.setPartitionId("0") val sc = sparkSession.sparkContext val modelBroadcast = sc.broadcast(model) val partialSuccessAccum = sc.accumulator(0, "Partial Success Accumulator") http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala index 4b7d3f7..5124247 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/load/DataLoadProcessorStepOnSpark.scala @@ -248,7 +248,7 @@ object DataLoadProcessorStepOnSpark { dataWriter = new DataWriterProcessorStepImpl(conf) - val dataHandlerModel = dataWriter.getDataHandlerModel(0) + val dataHandlerModel = dataWriter.getDataHandlerModel var dataHandler: CarbonFactHandler = null var rowsNotExist = true while (rows.hasNext) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala index 76c99f2..9de8dc9 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/AlterTableLoadPartitionRDD.scala @@ -18,19 +18,21 @@ package org.apache.carbondata.spark.rdd import scala.collection.JavaConverters._ +import scala.util.Random -import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.{Partition, SparkEnv, TaskContext} import org.apache.spark.rdd.RDD import org.apache.spark.sql.execution.command.AlterPartitionModel import org.apache.spark.util.PartitionUtils import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier +import org.apache.carbondata.core.util.CarbonProperties import org.apache.carbondata.processing.loading.TableProcessingOperations import org.apache.carbondata.processing.partition.spliter.RowResultProcessor import org.apache.carbondata.processing.util.{CarbonDataProcessorUtil, CarbonLoaderUtil} import org.apache.carbondata.spark.AlterPartitionResult -import org.apache.carbondata.spark.util.CommonUtil +import org.apache.carbondata.spark.util.{CommonUtil, Util} class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel, result: AlterPartitionResult[K, V], @@ -39,76 +41,96 @@ class AlterTableLoadPartitionRDD[K, V](alterPartitionModel: AlterPartitionModel, identifier: AbsoluteTableIdentifier, prev: RDD[Array[AnyRef]]) extends RDD[(K, V)](prev) { - var storeLocation: String = null - val carbonLoadModel = alterPartitionModel.carbonLoadModel - val segmentId = alterPartitionModel.segmentId - val oldPartitionIds = alterPartitionModel.oldPartitionIds - val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable - val databaseName = carbonTable.getDatabaseName - val factTableName = carbonTable.getTableName - val partitionInfo = carbonTable.getPartitionInfo(factTableName) + var storeLocation: String = null + val carbonLoadModel = alterPartitionModel.carbonLoadModel + val segmentId = alterPartitionModel.segmentId + val oldPartitionIds = alterPartitionModel.oldPartitionIds + val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable + val databaseName = carbonTable.getDatabaseName + val factTableName = carbonTable.getTableName + val partitionInfo = carbonTable.getPartitionInfo(factTableName) - override protected def getPartitions: Array[Partition] = { - val sc = alterPartitionModel.sqlContext.sparkContext - sc.setLocalProperty("spark.scheduler.pool", "DDL") - sc.setLocalProperty("spark.job.interruptOnCancel", "true") - firstParent[Array[AnyRef]].partitions - } - - override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = { - val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) - val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava - val iter = new Iterator[(K, V)] { - val partitionId = partitionInfo.getPartitionId(split.index) - carbonLoadModel.setTaskNo(String.valueOf(partitionId)) - carbonLoadModel.setSegmentId(segmentId) - carbonLoadModel.setPartitionId("0") - CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, false, true) + override protected def getPartitions: Array[Partition] = { + val sc = alterPartitionModel.sqlContext.sparkContext + sc.setLocalProperty("spark.scheduler.pool", "DDL") + sc.setLocalProperty("spark.job.interruptOnCancel", "true") + firstParent[Array[AnyRef]].partitions + } - val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName, - factTableName, - carbonLoadModel.getTaskNo, - "0", - segmentId, - false, - true - ) + override def compute(split: Partition, context: TaskContext): Iterator[(K, V)] = { + val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) + val rows = firstParent[Array[AnyRef]].iterator(split, context).toList.asJava + val iter = new Iterator[(K, V)] { + val partitionId = partitionInfo.getPartitionId(split.index) + carbonLoadModel.setTaskNo(String.valueOf(partitionId)) + carbonLoadModel.setSegmentId(segmentId) + CommonUtil.setTempStoreLocation(split.index, carbonLoadModel, false, true) + val tempLocationKey = CarbonDataProcessorUtil + .getTempStoreLocationKey(carbonLoadModel.getDatabaseName, + carbonLoadModel.getTableName, + segmentId, + carbonLoadModel.getTaskNo, + false, + true) + // this property is used to determine whether temp location for carbon is inside + // container temp dir or is yarn application directory. + val carbonUseLocalDir = CarbonProperties.getInstance() + .getProperty("carbon.use.local.dir", "false") - val loadStatus = if (rows.isEmpty) { - LOGGER.info("After repartition this split, NO target rows to write back.") - true - } else { - val segmentProperties = PartitionUtils.getSegmentProperties(identifier, - segmentId, partitionIds.toList, oldPartitionIds, partitionInfo, carbonTable) - val processor = new RowResultProcessor( - carbonTable, - carbonLoadModel, - segmentProperties, - tempStoreLoc, - bucketId) - try { - processor.execute(rows) - } catch { - case e: Exception => - sys.error(s"Exception when executing Row result processor ${e.getMessage}") - } finally { - TableProcessingOperations - .deleteLocalDataLoadFolderLocation(carbonLoadModel, false, true) - } - } + if (carbonUseLocalDir.equalsIgnoreCase("true")) { - val loadResult = segmentId - var finished = false + val storeLocations = Util.getConfiguredLocalDirs(SparkEnv.get.conf) + if (null != storeLocations && storeLocations.nonEmpty) { + storeLocation = storeLocations(Random.nextInt(storeLocations.length)) + } + if (storeLocation == null) { + storeLocation = System.getProperty("java.io.tmpdir") + } + } else { + storeLocation = System.getProperty("java.io.tmpdir") + } + storeLocation = storeLocation + '/' + System.nanoTime() + '/' + split.index + CarbonProperties.getInstance().addProperty(tempLocationKey, storeLocation) + LOGGER.info(s"Temp storeLocation taken is $storeLocation") - override def hasNext: Boolean = { - !finished - } + val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation( + databaseName, factTableName, carbonLoadModel.getTaskNo, segmentId, false, true) - override def next(): (K, V) = { - finished = true - result.getKey(loadResult, loadStatus) - } + val loadStatus = if (rows.isEmpty) { + LOGGER.info("After repartition this split, NO target rows to write back.") + true + } else { + val segmentProperties = PartitionUtils.getSegmentProperties(identifier, + segmentId, partitionIds.toList, oldPartitionIds, partitionInfo, carbonTable) + val processor = new RowResultProcessor( + carbonTable, + carbonLoadModel, + segmentProperties, + tempStoreLoc, + bucketId) + try { + processor.execute(rows) + } catch { + case e: Exception => + sys.error(s"Exception when executing Row result processor ${ e.getMessage }") + } finally { + TableProcessingOperations + .deleteLocalDataLoadFolderLocation(carbonLoadModel, false, true) } - iter + } + + val loadResult = segmentId + var finished = false + + override def hasNext: Boolean = { + !finished + } + + override def next(): (K, V) = { + finished = true + result.getKey(loadResult, loadStatus) + } } + iter + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala index e0dcffd..ab3ab5d 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/CarbonMergerRDD.scala @@ -189,16 +189,9 @@ class CarbonMergerRDD[K, V]( } } - val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation(databaseName, - factTableName, - carbonLoadModel.getTaskNo, - "0", - mergeNumber, - true, - false - ) + val tempStoreLoc = CarbonDataProcessorUtil.getLocalDataFolderLocation( + databaseName, factTableName, carbonLoadModel.getTaskNo, mergeNumber, true, false) - carbonLoadModel.setPartitionId("0") var processor: AbstractResultProcessor = null if (restructuredBlockExists) { LOGGER.info("CompactionResultSortProcessor flow is selected") http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 72d0484..1fa1689 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -43,6 +43,7 @@ import org.apache.carbondata.core.constants.CarbonCommonConstants import org.apache.carbondata.core.datastore.compression.CompressorFactory import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus} import org.apache.carbondata.core.util.{CarbonProperties, CarbonTimeStatisticsFactory, ThreadLocalTaskInfo} +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.processing.loading.{DataLoadExecutor, FailureCauses, TableProcessingOperations} import org.apache.carbondata.processing.loading.csvinput.{BlockDetails, CSVInputFormat, CSVRecordReaderIterator} import org.apache.carbondata.processing.loading.exception.NoRetryException @@ -129,7 +130,8 @@ class SparkPartitionLoader(model: CarbonLoadModel, System.setProperty("carbon.properties.filepath", System.getProperty("user.dir") + '/' + "conf" + '/' + "carbon.properties") } - CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo(model.getPartitionId) + CarbonTimeStatisticsFactory.getLoadStatisticsInstance.initPartitonInfo( + CarbonTablePath.DEPRECATED_PATITION_ID) CarbonProperties.getInstance().addProperty("carbon.is.columnar.storage", "true") CarbonProperties.getInstance().addProperty("carbon.dimension.split.value.in.columnar", "1") CarbonProperties.getInstance().addProperty("carbon.is.fullyfilled.bits", "true") @@ -219,14 +221,13 @@ class NewCarbonDataLoadRDD[K, V]( override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val iter = new Iterator[(K, V)] { - var partitionID = "0" val loadMetadataDetails = new LoadMetadataDetails() val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") var model: CarbonLoadModel = _ val uniqueLoadStatusId = carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index try { - loadMetadataDetails.setPartitionCount(partitionID) + loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID) loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) val preFetch = CarbonProperties.getInstance().getProperty(CarbonCommonConstants @@ -264,7 +265,7 @@ class NewCarbonDataLoadRDD[K, V]( // So print the data load statistics only in case of non failure case if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) { CarbonTimeStatisticsFactory.getLoadStatisticsInstance - .printStatisticsInfo(model.getPartitionId) + .printStatisticsInfo(CarbonTablePath.DEPRECATED_PATITION_ID) } } @@ -287,8 +288,8 @@ class NewCarbonDataLoadRDD[K, V]( val fileList: java.util.List[String] = new java.util.ArrayList[String]( CarbonCommonConstants.CONSTANT_SIZE_TEN) CarbonQueryUtil.splitFilePath(carbonLoadModel.getFactFilePath, fileList, ",") - model = carbonLoadModel.getCopyWithPartition(partitionID, fileList, - carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter) + model = carbonLoadModel.getCopyWithPartition( + carbonLoadModel.getCsvHeader, carbonLoadModel.getCsvDelimiter) StandardLogService.setThreadName(StandardLogService .getPartitionID(model.getCarbonDataLoadSchema.getCarbonTable.getTableUniqueName) , ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId + "") @@ -351,7 +352,6 @@ class NewDataFrameLoaderRDD[K, V]( val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val iter = new Iterator[(K, V)] { - val partitionID = "0" val loadMetadataDetails = new LoadMetadataDetails() val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") val model: CarbonLoadModel = carbonLoadModel @@ -359,9 +359,8 @@ class NewDataFrameLoaderRDD[K, V]( carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index try { - loadMetadataDetails.setPartitionCount(partitionID) + loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID) loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) - carbonLoadModel.setPartitionId(partitionID) carbonLoadModel.setTaskNo(String.valueOf(theSplit.index)) carbonLoadModel.setPreFetch(false) @@ -406,7 +405,7 @@ class NewDataFrameLoaderRDD[K, V]( // So print the data load statistics only in case of non failure case if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) { CarbonTimeStatisticsFactory.getLoadStatisticsInstance - .printStatisticsInfo(model.getPartitionId) + .printStatisticsInfo(CarbonTablePath.DEPRECATED_PATITION_ID) } } var finished = false @@ -542,7 +541,6 @@ class PartitionTableDataLoaderRDD[K, V]( override def internalCompute(theSplit: Partition, context: TaskContext): Iterator[(K, V)] = { val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val iter = new Iterator[(K, V)] { - val partitionID = "0" val loadMetadataDetails = new LoadMetadataDetails() val executionErrors = new ExecutionErrors(FailureCauses.NONE, "") val model: CarbonLoadModel = carbonLoadModel @@ -552,9 +550,8 @@ class PartitionTableDataLoaderRDD[K, V]( carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index try { - loadMetadataDetails.setPartitionCount(partitionID) + loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID) loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) - carbonLoadModel.setPartitionId(partitionID) carbonLoadModel.setTaskNo(String.valueOf(partitionInfo.getPartitionId(theSplit.index))) carbonLoadModel.setPreFetch(false) val recordReaders = Array[CarbonIterator[Array[AnyRef]]] { @@ -590,7 +587,7 @@ class PartitionTableDataLoaderRDD[K, V]( // So print the data load statistics only in case of non failure case if (SegmentStatus.LOAD_FAILURE != loadMetadataDetails.getSegmentStatus) { CarbonTimeStatisticsFactory.getLoadStatisticsInstance - .printStatisticsInfo(model.getPartitionId) + .printStatisticsInfo(CarbonTablePath.DEPRECATED_PATITION_ID) } } var finished = false http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala index 0498b25..3c871db 100644 --- a/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala +++ b/integration/spark-common/src/main/scala/org/apache/spark/util/PartitionUtils.scala @@ -190,8 +190,9 @@ object PartitionUtils { val batchNo = CarbonTablePath.DataFileUtil.getBatchNoFromTaskNo(taskNo) val taskId = CarbonTablePath.DataFileUtil.getTaskIdFromTaskNo(taskNo) val bucketNumber = CarbonTablePath.DataFileUtil.getBucketNo(path) - val indexFilePath = carbonTablePath.getCarbonIndexFilePath(String.valueOf(taskId), "0", - segmentId, batchNo, String.valueOf(bucketNumber), timestamp, version) + val indexFilePath = carbonTablePath.getCarbonIndexFilePath( + String.valueOf(taskId), segmentId, batchNo, String.valueOf(bucketNumber), + timestamp, version) // indexFilePath could be duplicated when multiple data file related to one index file if (indexFilePath != null && !pathList.contains(indexFilePath)) { pathList.add(indexFilePath) http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala index 1695a13..09484c4 100644 --- a/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala +++ b/integration/spark2/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala @@ -594,7 +594,6 @@ object CarbonDataRDDFactory { override def getPartition(key: Any): Int = { val segId = key.asInstanceOf[String] - // partitionId segmentIdIndex(segId) * parallelism + Random.nextInt(parallelism) } } @@ -628,7 +627,6 @@ object CarbonDataRDDFactory { val rddResult = new updateResultImpl() val LOGGER = LogServiceFactory.getLogService(this.getClass.getName) val resultIter = new Iterator[(String, (LoadMetadataDetails, ExecutionErrors))] { - val partitionID = "0" val loadMetadataDetails = new LoadMetadataDetails val executionErrors = ExecutionErrors(FailureCauses.NONE, "") var uniqueLoadStatusId = "" @@ -639,10 +637,9 @@ object CarbonDataRDDFactory { CarbonCommonConstants.UNDERSCORE + (index + "_0") - loadMetadataDetails.setPartitionCount(partitionID) + loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID) loadMetadataDetails.setLoadName(segId) loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_FAILURE) - carbonLoadModel.setPartitionId(partitionID) carbonLoadModel.setSegmentId(segId) carbonLoadModel.setTaskNo(String.valueOf(index)) carbonLoadModel.setFactTimeStamp(updateModel.get.updatedTimeStamp) http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala index bff65be..b4d3bea 100644 --- a/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala +++ b/integration/spark2/src/main/scala/org/apache/spark/sql/execution/datasources/CarbonFileFormat.scala @@ -110,7 +110,6 @@ with Serializable { model, conf ) - model.setPartitionId("0") model.setUseOnePass(options.getOrElse("onepass", "false").toBoolean) model.setDictionaryServerHost(options.getOrElse("dicthost", null)) model.setDictionaryServerPort(options.getOrElse("dictport", "-1").toInt) http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala index b5325ef..aadee81 100644 --- a/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala +++ b/integration/spark2/src/test/scala/org/apache/carbondata/spark/testsuite/partition/TestAlterPartitionTable.scala @@ -858,7 +858,7 @@ class TestAlterPartitionTable extends QueryTest with BeforeAndAfterAll { def getDataFiles(carbonTable: CarbonTable, segmentId: String): Array[CarbonFile] = { val tablePath = new CarbonTablePath(carbonTable.getCarbonTableIdentifier, carbonTable.getTablePath) - val segmentDir = tablePath.getCarbonDataDirectoryPath("0", segmentId) + val segmentDir = tablePath.getCarbonDataDirectoryPath(segmentId) val carbonFile = FileFactory.getCarbonFile(segmentDir, FileFactory.getFileType(segmentDir)) val dataFiles = carbonFile.listFiles(new CarbonFileFilter() { override def accept(file: CarbonFile): Boolean = { http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala index 102df39..9da7244 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/bucketing/TableBucketingTestCase.scala @@ -173,6 +173,7 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll { case s: ShuffleExchange => shuffleExists = true } assert(!shuffleExists, "shuffle should not exist on bucket tables") + sql("DROP TABLE bucketed_parquet_table") } test("test create table with bucket join of carbon table and non bucket parquet table") { @@ -197,6 +198,7 @@ class TableBucketingTestCase extends Spark2QueryTest with BeforeAndAfterAll { case s: ShuffleExchange => shuffleExists = true } assert(shuffleExists, "shuffle should exist on non bucket tables") + sql("DROP TABLE parquet_table") } test("test scalar subquery with equal") { http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java index b7270b9..895fb79 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/CarbonDataLoadConfiguration.java @@ -40,8 +40,6 @@ public class CarbonDataLoadConfiguration { private String[] header; - private String partitionId; - private String segmentId; private String taskNo; @@ -194,14 +192,6 @@ public class CarbonDataLoadConfiguration { this.tableIdentifier = tableIdentifier; } - public String getPartitionId() { - return partitionId; - } - - public void setPartitionId(String partitionId) { - this.partitionId = partitionId; - } - public String getSegmentId() { return segmentId; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java index f5b29e7..fc2796a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/DataLoadProcessBuilder.java @@ -182,7 +182,6 @@ public final class DataLoadProcessBuilder { configuration.setTableIdentifier(identifier); configuration.setSchemaUpdatedTimeStamp(carbonTable.getTableLastUpdatedTime()); configuration.setHeader(loadModel.getCsvHeaderColumns()); - configuration.setPartitionId(loadModel.getPartitionId()); configuration.setSegmentId(loadModel.getSegmentId()); configuration.setTaskNo(loadModel.getTaskNo()); configuration.setDataLoadProperty(DataLoadProcessorConstants.COMPLEX_DELIMITERS, http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java index e2be79c..a8db6c9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java @@ -63,8 +63,7 @@ public class TableProcessingOperations { //delete folder which metadata no exist in tablestatus for (int i = 0; i < carbonTable.getPartitionCount(); i++) { - final String partitionCount = i + ""; - String partitionPath = carbonTablePath.getPartitionDir(partitionCount); + String partitionPath = carbonTablePath.getPartitionDir(); FileFactory.FileType fileType = FileFactory.getFileType(partitionPath); if (FileFactory.isFileExist(partitionPath, fileType)) { CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType); http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java index 6432d38..fcc88b5 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterImpl.java @@ -73,8 +73,8 @@ public class ParallelReadMergeSorterImpl extends AbstractMergeSorter { String[] storeLocations = CarbonDataProcessorUtil.getLocalDataFolderLocation( sortParameters.getDatabaseName(), sortParameters.getTableName(), - String.valueOf(sortParameters.getTaskNo()), sortParameters.getPartitionID(), - sortParameters.getSegmentId() + "", false, false); + String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(), + false, false); // Set the data file location String[] dataFolderLocations = CarbonDataProcessorUtil.arrayAppend(storeLocations, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java index c7030dd..b7452a7 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/ParallelReadMergeSorterWithBucketingImpl.java @@ -133,10 +133,10 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte } private SingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) { - String[] storeLocation = CarbonDataProcessorUtil - .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(), - String.valueOf(sortParameters.getTaskNo()), bucketId, - sortParameters.getSegmentId() + "", false, false); + String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation( + sortParameters.getDatabaseName(), sortParameters.getTableName(), + String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(), + false, false); // Set the data file location String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); @@ -181,10 +181,9 @@ public class ParallelReadMergeSorterWithBucketingImpl extends AbstractMergeSorte } private void setTempLocation(SortParameters parameters) { - String[] carbonDataDirectoryPath = CarbonDataProcessorUtil - .getLocalDataFolderLocation(parameters.getDatabaseName(), - parameters.getTableName(), parameters.getTaskNo(), - parameters.getPartitionID(), parameters.getSegmentId(), false, false); + String[] carbonDataDirectoryPath = CarbonDataProcessorUtil.getLocalDataFolderLocation( + parameters.getDatabaseName(), parameters.getTableName(), parameters.getTaskNo(), + parameters.getSegmentId(), false, false); String[] tmpLocs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); parameters.setTempFileLocation(tmpLocs); http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java index c5579d9..ed3a55d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeBatchParallelReadMergeSorterImpl.java @@ -219,10 +219,9 @@ public class UnsafeBatchParallelReadMergeSorterImpl extends AbstractMergeSorter } private void setTempLocation(SortParameters parameters) { - String[] carbonDataDirectoryPath = CarbonDataProcessorUtil - .getLocalDataFolderLocation(parameters.getDatabaseName(), - parameters.getTableName(), parameters.getTaskNo(), batchCount + "", - parameters.getSegmentId(), false, false); + String[] carbonDataDirectoryPath = CarbonDataProcessorUtil.getLocalDataFolderLocation( + parameters.getDatabaseName(), parameters.getTableName(), parameters.getTaskNo(), + parameters.getSegmentId(), false, false); String[] tempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); parameters.setTempFileLocation(tempDirs); http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java index 3c48e4d..f605b22 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/sort/impl/UnsafeParallelReadMergeSorterWithBucketingImpl.java @@ -119,18 +119,17 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg Iterator<CarbonRowBatch>[] batchIterator = new Iterator[bucketingInfo.getNumberOfBuckets()]; for (int i = 0; i < sortDataRows.length; i++) { - batchIterator[i] = - new MergedDataIterator(String.valueOf(i), batchSize, intermediateFileMergers[i]); + batchIterator[i] = new MergedDataIterator(batchSize, intermediateFileMergers[i]); } return batchIterator; } - private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger(String bucketId) { - String[] storeLocation = CarbonDataProcessorUtil - .getLocalDataFolderLocation(sortParameters.getDatabaseName(), sortParameters.getTableName(), - String.valueOf(sortParameters.getTaskNo()), bucketId, - sortParameters.getSegmentId() + "", false, false); + private UnsafeSingleThreadFinalSortFilesMerger getFinalMerger() { + String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation( + sortParameters.getDatabaseName(), sortParameters.getTableName(), + String.valueOf(sortParameters.getTaskNo()), sortParameters.getSegmentId(), + false, false); // Set the data file location String[] dataFolderLocation = CarbonDataProcessorUtil.arrayAppend(storeLocation, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); @@ -173,7 +172,7 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg private void setTempLocation(SortParameters parameters) { String[] carbonDataDirectoryPath = CarbonDataProcessorUtil .getLocalDataFolderLocation(parameters.getDatabaseName(), parameters.getTableName(), - parameters.getTaskNo(), parameters.getPartitionID(), parameters.getSegmentId(), + parameters.getTaskNo(), parameters.getSegmentId(), false, false); String[] tmpLoc = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); @@ -224,7 +223,6 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg private class MergedDataIterator extends CarbonIterator<CarbonRowBatch> { - private String partitionId; private int batchSize; @@ -232,9 +230,8 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg private UnsafeIntermediateMerger intermediateMerger; - public MergedDataIterator(String partitionId, int batchSize, + public MergedDataIterator(int batchSize, UnsafeIntermediateMerger intermediateMerger) { - this.partitionId = partitionId; this.batchSize = batchSize; this.intermediateMerger = intermediateMerger; this.firstRow = true; @@ -245,7 +242,7 @@ public class UnsafeParallelReadMergeSorterWithBucketingImpl extends AbstractMerg @Override public boolean hasNext() { if (firstRow) { firstRow = false; - finalMerger = getFinalMerger(partitionId); + finalMerger = getFinalMerger(); List<UnsafeCarbonRowPage> rowPages = intermediateMerger.getRowPages(); finalMerger.startFinalMerge(rowPages.toArray(new UnsafeCarbonRowPage[rowPages.size()]), intermediateMerger.getMergedPages()); http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java index 8b87cfc..6cf1dcd 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java @@ -32,6 +32,7 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.util.CarbonThreadFactory; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep; import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; import org.apache.carbondata.processing.loading.DataField; @@ -88,11 +89,13 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces child.initialize(); } - private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) { - String[] storeLocation = CarbonDataProcessorUtil - .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(), - tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId, - configuration.getSegmentId() + "", false, false); + private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) { + String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation( + tableIdentifier.getDatabaseName(), + tableIdentifier.getTableName(), + String.valueOf(configuration.getTaskNo()), configuration.getSegmentId(), + false, + false); CarbonDataProcessorUtil.createLocations(storeLocation); return storeLocation; } @@ -115,11 +118,11 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces measureCount = configuration.getMeasureCount(); outputLength = measureCount + (this.noDictWithComplextCount > 0 ? 1 : 0) + 1; CarbonTimeStatisticsFactory.getLoadStatisticsInstance() - .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(), + .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID, System.currentTimeMillis()); if (iterators.length == 1) { - doExecute(iterators[0], 0, 0); + doExecute(iterators[0], 0); } else { executorService = Executors.newFixedThreadPool(iterators.length, new CarbonThreadFactory("NoSortDataWriterPool:" + configuration.getTableIdentifier() @@ -150,11 +153,10 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces return null; } - private void doExecute(Iterator<CarbonRowBatch> iterator, int partitionId, int iteratorIndex) { - String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(partitionId)); - CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel - .createCarbonFactDataHandlerModel(configuration, storeLocation, partitionId, - iteratorIndex); + private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex) { + String[] storeLocation = getStoreLocation(tableIdentifier); + CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel( + configuration, storeLocation, 0, iteratorIndex); CarbonFactHandler dataHandler = null; boolean rowsNotExist = true; while (iterator.hasNext()) { @@ -189,10 +191,11 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get()); processingComplete(dataHandler); CarbonTimeStatisticsFactory.getLoadStatisticsInstance() - .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(), + .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID, System.currentTimeMillis()); CarbonTimeStatisticsFactory.getLoadStatisticsInstance() - .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis()); + .recordMdkGenerateTotalTime(CarbonTablePath.DEPRECATED_PATITION_ID, + System.currentTimeMillis()); } private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException { @@ -298,7 +301,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces } @Override public void run() { - doExecute(this.iterator, 0, iteratorIndex); + doExecute(this.iterator, iteratorIndex); } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/bf3602fc/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java index f030d52..369c1f2 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java @@ -24,6 +24,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep; import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; import org.apache.carbondata.processing.loading.DataField; @@ -59,13 +60,11 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS child.initialize(); } - private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) { - String[] storeLocation = CarbonDataProcessorUtil - .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(), - tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId, - configuration.getSegmentId() + "", false, false); - CarbonDataProcessorUtil.createLocations(storeLocation); - return storeLocation; + private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) { + return CarbonDataProcessorUtil.getLocalDataFolderLocation( + tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), + String.valueOf(configuration.getTaskNo()), + configuration.getSegmentId(), false, false); } @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException { @@ -75,18 +74,19 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS String tableName = tableIdentifier.getTableName(); try { CarbonTimeStatisticsFactory.getLoadStatisticsInstance() - .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(), + .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID, System.currentTimeMillis()); int i = 0; + String[] storeLocation = getStoreLocation(tableIdentifier); + CarbonDataProcessorUtil.createLocations(storeLocation); for (Iterator<CarbonRowBatch> iterator : iterators) { - String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i)); int k = 0; while (iterator.hasNext()) { CarbonRowBatch next = iterator.next(); // If no rows from merge sorter, then don't create a file in fact column handler if (next.hasNext()) { CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel - .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++); + .createCarbonFactDataHandlerModel(configuration, storeLocation, 0, k++); CarbonFactHandler dataHandler = CarbonFactHandlerFactory .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR); dataHandler.initialise(); @@ -119,10 +119,11 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get()); processingComplete(dataHandler); CarbonTimeStatisticsFactory.getLoadStatisticsInstance() - .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(), + .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID, System.currentTimeMillis()); CarbonTimeStatisticsFactory.getLoadStatisticsInstance() - .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis()); + .recordMdkGenerateTotalTime(CarbonTablePath.DEPRECATED_PATITION_ID, + System.currentTimeMillis()); } private void processingComplete(CarbonFactHandler dataHandler) {