http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java index 8b87cfc..6cf1dcd 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java @@ -32,6 +32,7 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.util.CarbonThreadFactory; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep; import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; import org.apache.carbondata.processing.loading.DataField; @@ -88,11 +89,13 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces child.initialize(); } - private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) { - String[] storeLocation = CarbonDataProcessorUtil - .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(), - tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId, - configuration.getSegmentId() + "", false, false); + private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) { + String[] storeLocation = CarbonDataProcessorUtil.getLocalDataFolderLocation( + tableIdentifier.getDatabaseName(), + tableIdentifier.getTableName(), + String.valueOf(configuration.getTaskNo()), configuration.getSegmentId(), + false, + false); CarbonDataProcessorUtil.createLocations(storeLocation); return storeLocation; } @@ -115,11 +118,11 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces measureCount = configuration.getMeasureCount(); outputLength = measureCount + (this.noDictWithComplextCount > 0 ? 1 : 0) + 1; CarbonTimeStatisticsFactory.getLoadStatisticsInstance() - .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(), + .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID, System.currentTimeMillis()); if (iterators.length == 1) { - doExecute(iterators[0], 0, 0); + doExecute(iterators[0], 0); } else { executorService = Executors.newFixedThreadPool(iterators.length, new CarbonThreadFactory("NoSortDataWriterPool:" + configuration.getTableIdentifier() @@ -150,11 +153,10 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces return null; } - private void doExecute(Iterator<CarbonRowBatch> iterator, int partitionId, int iteratorIndex) { - String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(partitionId)); - CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel - .createCarbonFactDataHandlerModel(configuration, storeLocation, partitionId, - iteratorIndex); + private void doExecute(Iterator<CarbonRowBatch> iterator, int iteratorIndex) { + String[] storeLocation = getStoreLocation(tableIdentifier); + CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel( + configuration, storeLocation, 0, iteratorIndex); CarbonFactHandler dataHandler = null; boolean rowsNotExist = true; while (iterator.hasNext()) { @@ -189,10 +191,11 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get()); processingComplete(dataHandler); CarbonTimeStatisticsFactory.getLoadStatisticsInstance() - .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(), + .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID, System.currentTimeMillis()); CarbonTimeStatisticsFactory.getLoadStatisticsInstance() - .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis()); + .recordMdkGenerateTotalTime(CarbonTablePath.DEPRECATED_PATITION_ID, + System.currentTimeMillis()); } private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException { @@ -298,7 +301,7 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces } @Override public void run() { - doExecute(this.iterator, 0, iteratorIndex); + doExecute(this.iterator, iteratorIndex); } } }
http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java index f030d52..369c1f2 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java @@ -24,6 +24,7 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep; import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; import org.apache.carbondata.processing.loading.DataField; @@ -59,13 +60,11 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS child.initialize(); } - private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) { - String[] storeLocation = CarbonDataProcessorUtil - .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(), - tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId, - configuration.getSegmentId() + "", false, false); - CarbonDataProcessorUtil.createLocations(storeLocation); - return storeLocation; + private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) { + return CarbonDataProcessorUtil.getLocalDataFolderLocation( + tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), + String.valueOf(configuration.getTaskNo()), + configuration.getSegmentId(), false, false); } @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException { @@ -75,18 +74,19 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS String tableName = tableIdentifier.getTableName(); try { CarbonTimeStatisticsFactory.getLoadStatisticsInstance() - .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(), + .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID, System.currentTimeMillis()); int i = 0; + String[] storeLocation = getStoreLocation(tableIdentifier); + CarbonDataProcessorUtil.createLocations(storeLocation); for (Iterator<CarbonRowBatch> iterator : iterators) { - String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i)); int k = 0; while (iterator.hasNext()) { CarbonRowBatch next = iterator.next(); // If no rows from merge sorter, then don't create a file in fact column handler if (next.hasNext()) { CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel - .createCarbonFactDataHandlerModel(configuration, storeLocation, i, k++); + .createCarbonFactDataHandlerModel(configuration, storeLocation, 0, k++); CarbonFactHandler dataHandler = CarbonFactHandlerFactory .createCarbonFactHandler(model, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR); dataHandler.initialise(); @@ -119,10 +119,11 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get()); processingComplete(dataHandler); CarbonTimeStatisticsFactory.getLoadStatisticsInstance() - .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(), + .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID, System.currentTimeMillis()); CarbonTimeStatisticsFactory.getLoadStatisticsInstance() - .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis()); + .recordMdkGenerateTotalTime(CarbonTablePath.DEPRECATED_PATITION_ID, + System.currentTimeMillis()); } private void processingComplete(CarbonFactHandler dataHandler) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java index d321405..58009af 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java @@ -26,6 +26,7 @@ import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.util.CarbonTimeStatisticsFactory; +import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.loading.AbstractDataLoadProcessorStep; import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; import org.apache.carbondata.processing.loading.DataField; @@ -65,21 +66,21 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { child.initialize(); } - private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier, String partitionId) { + private String[] getStoreLocation(CarbonTableIdentifier tableIdentifier) { String[] storeLocation = CarbonDataProcessorUtil .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(), - tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), partitionId, - configuration.getSegmentId() + "", false, false); + tableIdentifier.getTableName(), String.valueOf(configuration.getTaskNo()), + configuration.getSegmentId(), false, false); CarbonDataProcessorUtil.createLocations(storeLocation); return storeLocation; } - public CarbonFactDataHandlerModel getDataHandlerModel(int partitionId) { + public CarbonFactDataHandlerModel getDataHandlerModel() { CarbonTableIdentifier tableIdentifier = configuration.getTableIdentifier().getCarbonTableIdentifier(); - String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(partitionId)); + String[] storeLocation = getStoreLocation(tableIdentifier); return CarbonFactDataHandlerModel.createCarbonFactDataHandlerModel(configuration, - storeLocation, partitionId, 0); + storeLocation, 0, 0); } @Override public Iterator<CarbonRowBatch>[] execute() throws CarbonDataLoadingException { @@ -89,11 +90,11 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { String tableName = tableIdentifier.getTableName(); try { CarbonTimeStatisticsFactory.getLoadStatisticsInstance() - .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(), + .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID, System.currentTimeMillis()); int i = 0; for (Iterator<CarbonRowBatch> iterator : iterators) { - String[] storeLocation = getStoreLocation(tableIdentifier, String.valueOf(i)); + String[] storeLocation = getStoreLocation(tableIdentifier); CarbonFactDataHandlerModel model = CarbonFactDataHandlerModel .createCarbonFactDataHandlerModel(configuration, storeLocation, i, 0); @@ -147,10 +148,11 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get()); processingComplete(dataHandler); CarbonTimeStatisticsFactory.getLoadStatisticsInstance() - .recordDictionaryValue2MdkAdd2FileTime(configuration.getPartitionId(), + .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID, System.currentTimeMillis()); CarbonTimeStatisticsFactory.getLoadStatisticsInstance() - .recordMdkGenerateTotalTime(configuration.getPartitionId(), System.currentTimeMillis()); + .recordMdkGenerateTotalTime(CarbonTablePath.DEPRECATED_PATITION_ID, + System.currentTimeMillis()); } private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException { http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index be27866..0eadc7f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -319,7 +319,7 @@ public final class CarbonDataMergerUtil { // create entry for merged one. LoadMetadataDetails loadMetadataDetails = new LoadMetadataDetails(); - loadMetadataDetails.setPartitionCount(carbonLoadModel.getPartitionId()); + loadMetadataDetails.setPartitionCount(CarbonTablePath.DEPRECATED_PATITION_ID); loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS); long loadEnddate = CarbonUpdateUtil.readCurrentTime(); loadMetadataDetails.setLoadEndTime(loadEnddate); @@ -676,7 +676,7 @@ public final class CarbonDataMergerUtil { CarbonTableIdentifier carbonTableIdentifier, String segmentId) { CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier); - return carbonTablePath.getCarbonDataDirectoryPath("0", segmentId); + return carbonTablePath.getCarbonDataDirectoryPath(segmentId); } @@ -1036,7 +1036,7 @@ public final class CarbonDataMergerUtil { CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath("0", seg); + String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(seg); CarbonFile segDir = FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); CarbonFile[] allSegmentFiles = segDir.listFiles(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index 2480a39..ff65db2 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -372,7 +372,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { return SortParameters .createSortParameters(carbonTable, carbonLoadModel.getDatabaseName(), tableName, dimensionColumnCount, segmentProperties.getComplexDimensions().size(), measureCount, - noDictionaryCount, carbonLoadModel.getPartitionId(), segmentId, + noDictionaryCount, segmentId, carbonLoadModel.getTaskNo(), noDictionaryColMapping, true); } @@ -422,7 +422,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { private void initTempStoreLocation() { tempStoreLocation = CarbonDataProcessorUtil .getLocalDataFolderLocation(carbonLoadModel.getDatabaseName(), tableName, - carbonLoadModel.getTaskNo(), carbonLoadModel.getPartitionId(), segmentId, + carbonLoadModel.getTaskNo(), segmentId, true, false); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java index 98d150e..4d31f87 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java +++ b/processing/src/main/java/org/apache/carbondata/processing/sort/sortdata/SortParameters.java @@ -26,6 +26,7 @@ import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.util.CarbonProperties; +import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.loading.CarbonDataLoadConfiguration; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; @@ -356,7 +357,7 @@ public class SortParameters implements Serializable { CarbonProperties carbonProperties = CarbonProperties.getInstance(); parameters.setDatabaseName(tableIdentifier.getDatabaseName()); parameters.setTableName(tableIdentifier.getTableName()); - parameters.setPartitionID(configuration.getPartitionId()); + parameters.setPartitionID("0"); parameters.setSegmentId(configuration.getSegmentId()); parameters.setTaskNo(configuration.getTaskNo()); parameters.setMeasureColCount(configuration.getMeasureCount()); @@ -392,10 +393,9 @@ public class SortParameters implements Serializable { LOGGER.info("File Buffer Size: " + parameters.getFileBufferSize()); - String[] carbonDataDirectoryPath = CarbonDataProcessorUtil - .getLocalDataFolderLocation(tableIdentifier.getDatabaseName(), - tableIdentifier.getTableName(), configuration.getTaskNo(), - configuration.getPartitionId(), configuration.getSegmentId(), false, false); + String[] carbonDataDirectoryPath = CarbonDataProcessorUtil.getLocalDataFolderLocation( + tableIdentifier.getDatabaseName(), tableIdentifier.getTableName(), + configuration.getTaskNo(), configuration.getSegmentId(), false, false); String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); @@ -448,13 +448,13 @@ public class SortParameters implements Serializable { public static SortParameters createSortParameters(CarbonTable carbonTable, String databaseName, String tableName, int dimColCount, int complexDimColCount, int measureColCount, - int noDictionaryCount, String partitionID, String segmentId, String taskNo, + int noDictionaryCount, String segmentId, String taskNo, boolean[] noDictionaryColMaping, boolean isCompactionFlow) { SortParameters parameters = new SortParameters(); CarbonProperties carbonProperties = CarbonProperties.getInstance(); parameters.setDatabaseName(databaseName); parameters.setTableName(tableName); - parameters.setPartitionID(partitionID); + parameters.setPartitionID(CarbonTablePath.DEPRECATED_PATITION_ID); parameters.setSegmentId(segmentId); parameters.setTaskNo(taskNo); parameters.setMeasureColCount(measureColCount); @@ -486,7 +486,7 @@ public class SortParameters implements Serializable { LOGGER.info("File Buffer Size: " + parameters.getFileBufferSize()); String[] carbonDataDirectoryPath = CarbonDataProcessorUtil - .getLocalDataFolderLocation(databaseName, tableName, taskNo, partitionID, segmentId, + .getLocalDataFolderLocation(databaseName, tableName, taskNo, segmentId, isCompactionFlow, false); String[] sortTempDirs = CarbonDataProcessorUtil.arrayAppend(carbonDataDirectoryPath, File.separator, CarbonCommonConstants.SORT_TEMP_FILE_LOCATION); http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index d15152c..9f3c86f 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -309,7 +309,7 @@ public class CarbonFactDataHandlerModel { carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes); String carbonDataDirectoryPath = CarbonDataProcessorUtil .checkAndCreateCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(), - tableName, loadModel.getPartitionId(), loadModel.getSegmentId()); + tableName, loadModel.getSegmentId()); carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath); List<CarbonDimension> dimensionByTableName = carbonTable.getDimensionByTableName(tableName); boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()]; @@ -336,9 +336,8 @@ public class CarbonFactDataHandlerModel { private static String getCarbonDataFolderLocation(CarbonDataLoadConfiguration configuration) { AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier(); CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); - String carbonDataDirectoryPath = carbonTablePath - .getCarbonDataDirectoryPath(configuration.getPartitionId(), - configuration.getSegmentId() + ""); + String carbonDataDirectoryPath = + carbonTablePath.getCarbonDataDirectoryPath(configuration.getSegmentId()); CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath); return carbonDataDirectoryPath; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index 2a4cc00..cfe6e31 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -123,13 +123,11 @@ public final class CarbonDataProcessorUtil { * @param databaseName * @param tableName * @param taskId - * @param partitionId * @param segmentId * @return */ public static String[] getLocalDataFolderLocation(String databaseName, String tableName, - String taskId, String partitionId, String segmentId, boolean isCompactionFlow, - boolean isAltPartitionFlow) { + String taskId, String segmentId, boolean isCompactionFlow, boolean isAltPartitionFlow) { String tempLocationKey = getTempStoreLocationKey(databaseName, tableName, segmentId, taskId, isCompactionFlow, isAltPartitionFlow); @@ -150,8 +148,7 @@ public final class CarbonDataProcessorUtil { String tmpStore = baseTmpStorePathArray[i]; CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(tmpStore, carbonTable.getCarbonTableIdentifier()); - String carbonDataDirectoryPath = - carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId + ""); + String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId); localDataFolderLocArray[i] = carbonDataDirectoryPath + File.separator + taskId; } @@ -378,13 +375,12 @@ public final class CarbonDataProcessorUtil { * @return data directory path */ public static String checkAndCreateCarbonStoreLocation(String factStoreLocation, - String databaseName, String tableName, String partitionId, String segmentId) { + String databaseName, String tableName, String segmentId) { CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName); CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier(); CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(factStoreLocation, carbonTableIdentifier); - String carbonDataDirectoryPath = - carbonTablePath.getCarbonDataDirectoryPath(partitionId, segmentId); + String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId); CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath); return carbonDataDirectoryPath; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 00f13a5..083128d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -77,10 +77,8 @@ public final class CarbonLoaderUtil { CarbonTablePath carbonTablePath = CarbonStorePath .getCarbonTablePath(loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier()); - for (int i = 0; i < carbonTable.getPartitionCount(); i++) { - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(i + "", currentLoad + ""); - deleteStorePath(segmentPath); - } + String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(currentLoad + ""); + deleteStorePath(segmentPath); } /** @@ -100,7 +98,7 @@ public final class CarbonLoaderUtil { int fileCount = 0; int partitionCount = carbonTable.getPartitionCount(); for (int i = 0; i < partitionCount; i++) { - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(i + "", + String segmentPath = carbonTablePath.getCarbonDataDirectoryPath( currentLoad + ""); CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); @@ -295,7 +293,7 @@ public final class CarbonLoaderUtil { private static void addToStaleFolders(CarbonTablePath carbonTablePath, List<CarbonFile> staleFolders, LoadMetadataDetails entry) throws IOException { - String path = carbonTablePath.getCarbonDataDirectoryPath("0", entry.getLoadName()); + String path = carbonTablePath.getCarbonDataDirectoryPath(entry.getLoadName()); // add to the deletion list only if file exist else HDFS file system will throw // exception while deleting the file if file path does not exist if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) { @@ -876,7 +874,7 @@ public final class CarbonLoaderUtil { CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier(); CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath(), carbonTableIdentifier); - String segmentFolder = carbonTablePath.getCarbonDataDirectoryPath("0", segmentId); + String segmentFolder = carbonTablePath.getCarbonDataDirectoryPath(segmentId); CarbonUtil.checkAndCreateFolder(segmentFolder); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java index 02ab1d8..f9f3e20 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java @@ -48,15 +48,14 @@ public final class DeleteLoadFolders { * returns segment path * * @param absoluteTableIdentifier - * @param partitionId * @param oneLoad * @return */ private static String getSegmentPath(AbsoluteTableIdentifier absoluteTableIdentifier, - int partitionId, LoadMetadataDetails oneLoad) { + LoadMetadataDetails oneLoad) { CarbonTablePath carbon = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); String segmentId = oneLoad.getLoadName(); - return carbon.getCarbonDataDirectoryPath("" + partitionId, segmentId); + return carbon.getCarbonDataDirectoryPath(segmentId); } public static void physicalFactAndMeasureMetadataDeletion( @@ -64,7 +63,7 @@ public final class DeleteLoadFolders { LoadMetadataDetails[] currentDetails = SegmentStatusManager.readLoadMetadata(metadataPath); for (LoadMetadataDetails oneLoad : currentDetails) { if (checkIfLoadCanBeDeletedPhysically(oneLoad, isForceDelete)) { - String path = getSegmentPath(absoluteTableIdentifier, 0, oneLoad); + String path = getSegmentPath(absoluteTableIdentifier, oneLoad); boolean status = false; try { if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java index e662757..7f0aef6 100644 --- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java +++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java @@ -163,7 +163,6 @@ public class StoreCreator { loadModel.setCsvHeaderColumns(loadModel.getCsvHeader().split(",")); loadModel.setTaskNo("0"); loadModel.setSegmentId("0"); - loadModel.setPartitionId("0"); loadModel.setFactTimeStamp(System.currentTimeMillis()); loadModel.setMaxColumns("10"); http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala index 41dfa50..9088731 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala @@ -107,7 +107,6 @@ class StreamHandoffRDD[K, V]( split: Partition, context: TaskContext ): Iterator[(K, V)] = { - carbonLoadModel.setPartitionId("0") carbonLoadModel.setTaskNo("" + split.index) val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable CarbonMetadata.getInstance().addCarbonTable(carbonTable) http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala index 1c7be6a..f2274be 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala @@ -135,7 +135,7 @@ object StreamSinkFactory { FileFactory.mkdirs(carbonTablePath.getMetadataDirectoryPath, fileType) } val segmentId = StreamSegment.open(carbonTable) - val segmentDir = carbonTablePath.getSegmentDir("0", segmentId) + val segmentDir = carbonTablePath.getSegmentDir(segmentId) if (FileFactory.isFileExist(segmentDir, fileType)) { // recover fault StreamSegment.recoverSegmentIfRequired(segmentDir) http://git-wip-us.apache.org/repos/asf/carbondata/blob/aa2f075b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala index 849bf99..45bc19a 100644 --- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala +++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala @@ -149,12 +149,12 @@ class CarbonAppendableStreamSink( * if the directory size of current segment beyond the threshold, hand off new segment */ private def checkOrHandOffSegment(): Unit = { - val segmentDir = carbonTablePath.getSegmentDir("0", currentSegmentId) + val segmentDir = carbonTablePath.getSegmentDir(currentSegmentId) val fileType = FileFactory.getFileType(segmentDir) if (segmentMaxSize <= StreamSegment.size(segmentDir)) { val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId) currentSegmentId = newSegmentId - val newSegmentDir = carbonTablePath.getSegmentDir("0", currentSegmentId) + val newSegmentDir = carbonTablePath.getSegmentDir(currentSegmentId) FileFactory.mkdirs(newSegmentDir, fileType) // TODO trigger hand off operation @@ -251,14 +251,14 @@ object CarbonAppendableStreamSink { // update data file info in index file val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - StreamSegment.updateIndexFile(tablePath.getSegmentDir("0", segmentId)) + StreamSegment.updateIndexFile(tablePath.getSegmentDir(segmentId)) } catch { // catch fault of executor side case t: Throwable => val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val segmentDir = tablePath.getSegmentDir("0", segmentId) + val segmentDir = tablePath.getSegmentDir(segmentId) StreamSegment.recoverSegmentIfRequired(segmentDir) LOGGER.error(t, s"Aborting job ${ job.getJobID }.") committer.abortJob(job)