http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala index 44204d4..f5a90de 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/TestStreamingTableOperation.scala @@ -32,8 +32,10 @@ import org.apache.spark.sql.types.StructType import org.scalatest.BeforeAndAfterAll import org.apache.carbondata.core.constants.CarbonCommonConstants +import org.apache.carbondata.core.datastore.impl.FileFactory +import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.statusmanager.{FileFormat, SegmentStatus} -import org.apache.carbondata.core.util.path.{CarbonStorePath, CarbonTablePath} +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.spark.exception.MalformedCarbonCommandException class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { @@ -189,7 +191,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { val identifier = new TableIdentifier("batch_table", Option("streaming")) val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) .asInstanceOf[CarbonRelation].metaData.carbonTable - val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) var server: ServerSocket = null try { server = getServerSocket @@ -197,7 +198,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { thread1.start() // use thread pool to catch the exception of sink thread val pool = Executors.newSingleThreadExecutor() - val thread2 = createSocketStreamingThread(spark, server.getLocalPort, tablePath, identifier) + val thread2 = createSocketStreamingThread(spark, server.getLocalPort, carbonTable, identifier) val future = pool.submit(thread2) Thread.sleep(1000) thread1.interrupt() @@ -220,11 +221,10 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { val identifier = new TableIdentifier("stream_table_file", Option("streaming")) val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) .asInstanceOf[CarbonRelation].metaData.carbonTable - val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) val csvDataDir = new File("target/csvdata").getCanonicalPath // streaming ingest 10 rows generateCSVDataFile(spark, idStart = 10, rowNums = 10, csvDataDir) - val thread = createFileStreamingThread(spark, tablePath, csvDataDir, intervalSecond = 1, + val thread = createFileStreamingThread(spark, carbonTable, csvDataDir, intervalSecond = 1, identifier) thread.start() Thread.sleep(2000) @@ -646,12 +646,11 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { val identifier = new TableIdentifier("stream_table_drop", Option("streaming")) val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) .asInstanceOf[CarbonRelation].metaData.carbonTable - val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) var server: ServerSocket = null try { server = getServerSocket val thread1 = createWriteSocketThread(server, 2, 10, 3) - val thread2 = createSocketStreamingThread(spark, server.getLocalPort, tablePath, identifier, "force", 5, 1024L * 200, false) + val thread2 = createSocketStreamingThread(spark, server.getLocalPort, carbonTable, identifier, "force", 5, 1024L * 200, false) thread1.start() thread2.start() Thread.sleep(1000) @@ -749,7 +748,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { def createSocketStreamingThread( spark: SparkSession, port: Int, - tablePath: CarbonTablePath, + carbonTable: CarbonTable, tableIdentifier: TableIdentifier, badRecordAction: String = "force", intervalSecond: Int = 2, @@ -770,7 +769,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { qry = readSocketDF.writeStream .format("carbondata") .trigger(ProcessingTime(s"$intervalSecond seconds")) - .option("checkpointLocation", tablePath.getStreamingCheckpointDir) + .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath)) .option("bad_records_action", badRecordAction) .option("dbName", tableIdentifier.database.get) .option("tableName", tableIdentifier.table) @@ -808,7 +807,6 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { val identifier = new TableIdentifier(tableName, Option("streaming")) val carbonTable = CarbonEnv.getInstance(spark).carbonMetastore.lookupRelation(identifier)(spark) .asInstanceOf[CarbonRelation].metaData.carbonTable - val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) var server: ServerSocket = null try { server = getServerSocket() @@ -821,7 +819,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { val thread2 = createSocketStreamingThread( spark = spark, port = server.getLocalPort, - tablePath = tablePath, + carbonTable = carbonTable, tableIdentifier = identifier, badRecordAction = badRecordAction, intervalSecond = intervalOfIngest, @@ -863,7 +861,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { def createFileStreamingThread( spark: SparkSession, - tablePath: CarbonTablePath, + carbonTable: CarbonTable, csvDataDir: String, intervalSecond: Int, tableIdentifier: TableIdentifier): Thread = { @@ -889,7 +887,7 @@ class TestStreamingTableOperation extends QueryTest with BeforeAndAfterAll { qry = readSocketDF.writeStream .format("carbondata") .trigger(ProcessingTime(s"${ intervalSecond } seconds")) - .option("checkpointLocation", tablePath.getStreamingCheckpointDir) + .option("checkpointLocation", CarbonTablePath.getStreamingCheckpointDir(carbonTable.getTablePath)) .option("dbName", tableIdentifier.database.get) .option("tableName", tableIdentifier.table) .start()
http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala ---------------------------------------------------------------------- diff --git a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala index 9a6efbe..97dc8ba 100644 --- a/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala +++ b/integration/spark2/src/test/scala/org/apache/spark/carbondata/restructure/AlterTableRevertTestCase.scala @@ -92,7 +92,7 @@ class AlterTableRevertTestCase extends Spark2QueryTest with BeforeAndAfterAll { } val carbonTable = CarbonMetadata.getInstance.getCarbonTable("default", "reverttest") - assert(new File(carbonTable.getMetaDataFilepath).listFiles().length < 6) + assert(new File(carbonTable.getMetadataPath).listFiles().length < 6) } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java index a8db6c9..bbc3697 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java @@ -34,7 +34,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonThreadFactory; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; @@ -56,43 +55,39 @@ public class TableProcessingOperations { */ public static void deletePartialLoadDataIfExist(CarbonTable carbonTable, final boolean isCompactionFlow) throws IOException { - String metaDataLocation = carbonTable.getMetaDataFilepath(); + String metaDataLocation = carbonTable.getMetadataPath(); final LoadMetadataDetails[] details = SegmentStatusManager.readLoadMetadata(metaDataLocation); - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(carbonTable.getTablePath(), carbonTable.getCarbonTableIdentifier()); //delete folder which metadata no exist in tablestatus - for (int i = 0; i < carbonTable.getPartitionCount(); i++) { - String partitionPath = carbonTablePath.getPartitionDir(); - FileFactory.FileType fileType = FileFactory.getFileType(partitionPath); - if (FileFactory.isFileExist(partitionPath, fileType)) { - CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType); - CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { - @Override public boolean accept(CarbonFile path) { - String segmentId = - CarbonTablePath.DataPathUtil.getSegmentId(path.getAbsolutePath() + "/dummy"); - boolean found = false; - for (int j = 0; j < details.length; j++) { - if (details[j].getLoadName().equals(segmentId)) { - found = true; - break; - } - } - return !found; - } - }); - for (int k = 0; k < listFiles.length; k++) { + String partitionPath = CarbonTablePath.getPartitionDir(carbonTable.getTablePath()); + FileFactory.FileType fileType = FileFactory.getFileType(partitionPath); + if (FileFactory.isFileExist(partitionPath, fileType)) { + CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath, fileType); + CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() { + @Override public boolean accept(CarbonFile path) { String segmentId = - CarbonTablePath.DataPathUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy"); - if (isCompactionFlow) { - if (segmentId.contains(".")) { - CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); - } - } else { - if (!segmentId.contains(".")) { - CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); + CarbonTablePath.DataFileUtil.getSegmentId(path.getAbsolutePath() + "/dummy"); + boolean found = false; + for (int j = 0; j < details.length; j++) { + if (details[j].getLoadName().equals(segmentId)) { + found = true; + break; } } + return !found; + } + }); + for (int k = 0; k < listFiles.length; k++) { + String segmentId = + CarbonTablePath.DataFileUtil.getSegmentId(listFiles[k].getAbsolutePath() + "/dummy"); + if (isCompactionFlow) { + if (segmentId.contains(".")) { + CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); + } + } else { + if (!segmentId.contains(".")) { + CarbonLoaderUtil.deleteStorePath(listFiles[k].getAbsolutePath()); + } } } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java index 4cd5014..193d192 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/converter/impl/FieldEncoderFactory.java @@ -34,8 +34,6 @@ import org.apache.carbondata.core.metadata.schema.table.RelationIdentifier; import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn; import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.ParentColumnTableRelation; -import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.datatypes.ArrayDataType; import org.apache.carbondata.processing.datatypes.GenericDataType; @@ -105,12 +103,11 @@ public class FieldEncoderFactory { ColumnIdentifier parentColumnIdentifier = new ColumnIdentifier(parentColumnTableRelation.getColumnId(), null, dataField.getColumn().getDataType()); - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); AbsoluteTableIdentifier parentAbsoluteTableIdentifier = AbsoluteTableIdentifier.from( - CarbonUtil.getNewTablePath(carbonTablePath, parentTableIdentifier.getTableName()), - parentTableIdentifier); + CarbonTablePath.getNewTablePath( + absoluteTableIdentifier.getTablePath(), parentTableIdentifier.getTableName()), + parentTableIdentifier); identifier = new DictionaryColumnUniqueIdentifier(parentAbsoluteTableIdentifier, parentColumnIdentifier, dataField.getColumn().getDataType()); return new DictionaryFieldConverterImpl(dataField, cache, parentAbsoluteTableIdentifier, http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java index d3caa99..a08177a 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/AbstractResultProcessor.java @@ -19,10 +19,8 @@ package org.apache.carbondata.processing.merger; import java.util.List; -import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.scan.result.iterator.RawResultIterator; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.store.CarbonDataFileAttributes; import org.apache.carbondata.processing.store.CarbonFactDataHandlerModel; @@ -42,13 +40,11 @@ public abstract class AbstractResultProcessor { public abstract boolean execute(List<RawResultIterator> resultIteratorList); protected void setDataFileAttributesInModel(CarbonLoadModel loadModel, - CompactionType compactionType, CarbonTable carbonTable, - CarbonFactDataHandlerModel carbonFactDataHandlerModel) { + CompactionType compactionType, CarbonFactDataHandlerModel carbonFactDataHandlerModel) { CarbonDataFileAttributes carbonDataFileAttributes; if (compactionType == CompactionType.IUD_UPDDEL_DELTA) { long taskNo = CarbonUpdateUtil.getLatestTaskIdForSegment(loadModel.getSegmentId(), - CarbonStorePath.getCarbonTablePath(loadModel.getTablePath(), - carbonTable.getCarbonTableIdentifier())); + loadModel.getTablePath()); // Increase the Task Index as in IUD_UPDDEL_DELTA_COMPACTION the new file will // be written in same segment. So the TaskNo should be incremented by 1 from max val. long index = taskNo + 1; http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java index d796262..4fa4ff4 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonCompactionUtil.java @@ -263,7 +263,7 @@ public class CarbonCompactionUtil { public static CarbonTable getNextTableToCompact(CarbonTable[] carbonTables, List<CarbonTableIdentifier> skipList) { for (CarbonTable ctable : carbonTables) { - String metadataPath = ctable.getMetaDataFilepath(); + String metadataPath = ctable.getMetadataPath(); // check for the compaction required file and at the same time exclude the tables which are // present in the skip list. if (CarbonCompactionUtil.isCompactionRequiredForTable(metadataPath) && !skipList http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java index 0eadc7f..c43dbf9 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CarbonDataMergerUtil.java @@ -31,7 +31,6 @@ import org.apache.carbondata.core.datastore.filesystem.CarbonFileFilter; import org.apache.carbondata.core.datastore.impl.FileFactory; import org.apache.carbondata.core.locks.ICarbonLock; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.mutate.DeleteDeltaBlockDetails; @@ -42,7 +41,6 @@ import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.statusmanager.SegmentUpdateStatusManager; import org.apache.carbondata.core.util.CarbonProperties; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.CarbonDeleteDeltaWriterImpl; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; @@ -167,15 +165,13 @@ public final class CarbonDataMergerUtil { // End Timestamp. // Table Update Status Metadata Update. - AbsoluteTableIdentifier absoluteTableIdentifier = + AbsoluteTableIdentifier identifier = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); - CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); - SegmentUpdateStatusManager segmentUpdateStatusManager = - new SegmentUpdateStatusManager(absoluteTableIdentifier); + new SegmentUpdateStatusManager(identifier); - SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); ICarbonLock updateLock = segmentUpdateStatusManager.getTableUpdateStatusLock(); ICarbonLock statusLock = segmentStatusManager.getTableStatusLock(); @@ -222,7 +218,7 @@ public final class CarbonDataMergerUtil { } LoadMetadataDetails[] loadDetails = - segmentStatusManager.readLoadMetadata(metaDataFilepath); + SegmentStatusManager.readLoadMetadata(metaDataFilepath); for (LoadMetadataDetails loadDetail : loadDetails) { if (loadsToMerge.contains(loadDetail)) { @@ -235,18 +231,18 @@ public final class CarbonDataMergerUtil { } } - segmentUpdateStatusManager - .writeLoadDetailsIntoFile(Arrays.asList(updateLists), timestamp); - segmentStatusManager - .writeLoadDetailsIntoFile(carbonTablePath.getTableStatusFilePath(), loadDetails); + segmentUpdateStatusManager.writeLoadDetailsIntoFile( + Arrays.asList(updateLists), timestamp); + SegmentStatusManager.writeLoadDetailsIntoFile( + CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()), loadDetails); status = true; } else { LOGGER.error("Not able to acquire the lock."); status = false; } } catch (IOException e) { - LOGGER.error("Error while updating metadata. The metadata file path is " + carbonTablePath - .getMetadataDirectoryPath()); + LOGGER.error("Error while updating metadata. The metadata file path is " + + CarbonTablePath.getMetadataPath(identifier.getTablePath())); status = false; } finally { @@ -282,9 +278,9 @@ public final class CarbonDataMergerUtil { String metaDataFilepath, String mergedLoadNumber, CarbonLoadModel carbonLoadModel, CompactionType compactionType) throws IOException { boolean tableStatusUpdationStatus = false; - AbsoluteTableIdentifier absoluteTableIdentifier = + AbsoluteTableIdentifier identifier = carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); - SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); @@ -293,10 +289,7 @@ public final class CarbonDataMergerUtil { LOGGER.info("Acquired lock for the table " + carbonLoadModel.getDatabaseName() + "." + carbonLoadModel.getTableName() + " for table status updation "); - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier); - - String statusFilePath = carbonTablePath.getTableStatusFilePath(); + String statusFilePath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()); LoadMetadataDetails[] loadDetails = SegmentStatusManager.readLoadMetadata(metaDataFilepath); @@ -595,10 +588,6 @@ public final class CarbonDataMergerUtil { List<LoadMetadataDetails> segmentsToBeMerged = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); - CarbonTableIdentifier tableIdentifier = - carbonLoadModel.getCarbonDataLoadSchema().getCarbonTable().getCarbonTableIdentifier(); - - // total length long totalLength = 0; @@ -613,7 +602,7 @@ public final class CarbonDataMergerUtil { String segId = segment.getLoadName(); // variable to store one segment size across partition. long sizeOfOneSegmentAcrossPartition = - getSizeOfSegment(tablePath, tableIdentifier, segId); + getSizeOfSegment(tablePath, segId); // if size of a segment is greater than the Major compaction size. then ignore it. if (sizeOfOneSegmentAcrossPartition > (compactionSize * 1024 * 1024)) { @@ -652,35 +641,17 @@ public final class CarbonDataMergerUtil { /** * For calculating the size of the specified segment * @param tablePath the store path of the segment - * @param tableIdentifier identifier of table that the segment belong to * @param segId segment id * @return the data size of the segment */ - private static long getSizeOfSegment(String tablePath, - CarbonTableIdentifier tableIdentifier, String segId) { - String loadPath = getStoreLocation(tablePath, tableIdentifier, segId); + private static long getSizeOfSegment(String tablePath, String segId) { + String loadPath = CarbonTablePath.getSegmentPath(tablePath, segId); CarbonFile segmentFolder = FileFactory.getCarbonFile(loadPath, FileFactory.getFileType(loadPath)); return getSizeOfFactFileInLoad(segmentFolder); } /** - * This method will get the store location for the given path, segemnt id and partition id - * - * @param tablePath - * @param carbonTableIdentifier identifier of catbon table that the segment belong to - * @param segmentId segment id - * @return the store location of the segment - */ - private static String getStoreLocation(String tablePath, - CarbonTableIdentifier carbonTableIdentifier, String segmentId) { - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath(tablePath, carbonTableIdentifier); - return carbonTablePath.getCarbonDataDirectoryPath(segmentId); - } - - - /** * Identify the segments to be merged based on the segment count * * @param listOfSegmentsAfterPreserve the list of segments after @@ -1022,21 +993,19 @@ public final class CarbonDataMergerUtil { * if UpdateDelta Files are more than IUD Compaction threshold. * * @param seg - * @param absoluteTableIdentifier + * @param identifier * @param segmentUpdateStatusManager * @param numberDeltaFilesThreshold * @return */ public static Boolean checkUpdateDeltaFilesInSeg(String seg, - AbsoluteTableIdentifier absoluteTableIdentifier, + AbsoluteTableIdentifier identifier, SegmentUpdateStatusManager segmentUpdateStatusManager, int numberDeltaFilesThreshold) { CarbonFile[] updateDeltaFiles = null; Set<String> uniqueBlocks = new HashSet<String>(); - CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); - - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(seg); + String segmentPath = CarbonTablePath.getSegmentPath(identifier.getTablePath(), seg); CarbonFile segDir = FileFactory.getCarbonFile(segmentPath, FileFactory.getFileType(segmentPath)); CarbonFile[] allSegmentFiles = segDir.listFiles(); @@ -1282,15 +1251,12 @@ public final class CarbonDataMergerUtil { CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails, table, timestamp, true); // Update the Table Status. - String metaDataFilepath = table.getMetaDataFilepath(); - AbsoluteTableIdentifier absoluteTableIdentifier = table.getAbsoluteTableIdentifier(); - - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier); + String metaDataFilepath = table.getMetadataPath(); + AbsoluteTableIdentifier identifier = table.getAbsoluteTableIdentifier(); - String tableStatusPath = carbonTablePath.getTableStatusFilePath(); + String tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()); - SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); @@ -1304,7 +1270,7 @@ public final class CarbonDataMergerUtil { + " for table status updation"); LoadMetadataDetails[] listOfLoadFolderDetailsArray = - segmentStatusManager.readLoadMetadata(metaDataFilepath); + SegmentStatusManager.readLoadMetadata(metaDataFilepath); for (LoadMetadataDetails loadMetadata : listOfLoadFolderDetailsArray) { if (loadMetadata.getLoadName().equalsIgnoreCase("0")) { @@ -1313,7 +1279,7 @@ public final class CarbonDataMergerUtil { } } try { - segmentStatusManager + SegmentStatusManager .writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetailsArray); } catch (IOException e) { return false; http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java index ff65db2..8fc6e66 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/CompactionResultSortProcessor.java @@ -404,8 +404,7 @@ public class CompactionResultSortProcessor extends AbstractResultProcessor { CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel .getCarbonFactDataHandlerModel(carbonLoadModel, carbonTable, segmentProperties, tableName, tempStoreLocation); - setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonTable, - carbonFactDataHandlerModel); + setDataFileAttributesInModel(carbonLoadModel, compactionType, carbonFactDataHandlerModel); dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(carbonFactDataHandlerModel, CarbonFactHandlerFactory.FactHandlerType.COLUMNAR); try { http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java index 3d0700b..6f506b1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java +++ b/processing/src/main/java/org/apache/carbondata/processing/merger/RowResultMergerProcessor.java @@ -72,8 +72,7 @@ public class RowResultMergerProcessor extends AbstractResultProcessor { CarbonFactDataHandlerModel carbonFactDataHandlerModel = CarbonFactDataHandlerModel .getCarbonFactDataHandlerModel(loadModel, carbonTable, segProp, tableName, tempStoreLocation); - setDataFileAttributesInModel(loadModel, compactionType, carbonTable, - carbonFactDataHandlerModel); + setDataFileAttributesInModel(loadModel, compactionType, carbonFactDataHandlerModel); carbonFactDataHandlerModel.setCompactionFlow(true); dataHandler = new CarbonFactDataHandlerColumnar(carbonFactDataHandlerModel); } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java index 9f3c86f..bc87823 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerModel.java @@ -35,7 +35,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.datamap.DataMapWriterListener; import org.apache.carbondata.processing.datatypes.GenericDataType; @@ -308,8 +307,7 @@ public class CarbonFactDataHandlerModel { } carbonFactDataHandlerModel.setMeasureDataType(measureDataTypes); String carbonDataDirectoryPath = CarbonDataProcessorUtil - .checkAndCreateCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getDatabaseName(), - tableName, loadModel.getSegmentId()); + .checkAndCreateCarbonStoreLocation(carbonTable.getTablePath(), loadModel.getSegmentId()); carbonFactDataHandlerModel.setCarbonDataDirectoryPath(carbonDataDirectoryPath); List<CarbonDimension> dimensionByTableName = carbonTable.getDimensionByTableName(tableName); boolean[] isUseInvertedIndexes = new boolean[dimensionByTableName.size()]; @@ -334,10 +332,9 @@ public class CarbonFactDataHandlerModel { * @return data directory path */ private static String getCarbonDataFolderLocation(CarbonDataLoadConfiguration configuration) { - AbsoluteTableIdentifier absoluteTableIdentifier = configuration.getTableIdentifier(); - CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); + AbsoluteTableIdentifier identifier = configuration.getTableIdentifier(); String carbonDataDirectoryPath = - carbonTablePath.getCarbonDataDirectoryPath(configuration.getSegmentId()); + CarbonTablePath.getSegmentPath(identifier.getTablePath(), configuration.getSegmentId()); CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath); return carbonDataDirectoryPath; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java index cfe6e31..ccde9e1 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonDataProcessorUtil.java @@ -34,7 +34,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.datastore.ColumnType; import org.apache.carbondata.core.metadata.CarbonMetadata; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.datatype.DataTypes; import org.apache.carbondata.core.metadata.encoder.Encoding; @@ -44,7 +43,6 @@ import org.apache.carbondata.core.metadata.schema.table.column.CarbonDimension; import org.apache.carbondata.core.metadata.schema.table.column.CarbonMeasure; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.datatypes.ArrayDataType; import org.apache.carbondata.processing.datatypes.GenericDataType; @@ -143,12 +141,9 @@ public final class CarbonDataProcessorUtil { String[] baseTmpStorePathArray = StringUtils.split(baseTempStorePath, File.pathSeparator); String[] localDataFolderLocArray = new String[baseTmpStorePathArray.length]; - CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName); for (int i = 0 ; i < baseTmpStorePathArray.length; i++) { String tmpStore = baseTmpStorePathArray[i]; - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath(tmpStore, carbonTable.getCarbonTableIdentifier()); - String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId); + String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath(tmpStore, segmentId); localDataFolderLocArray[i] = carbonDataDirectoryPath + File.separator + taskId; } @@ -375,12 +370,9 @@ public final class CarbonDataProcessorUtil { * @return data directory path */ public static String checkAndCreateCarbonStoreLocation(String factStoreLocation, - String databaseName, String tableName, String segmentId) { - CarbonTable carbonTable = CarbonMetadata.getInstance().getCarbonTable(databaseName, tableName); - CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier(); - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath(factStoreLocation, carbonTableIdentifier); - String carbonDataDirectoryPath = carbonTablePath.getCarbonDataDirectoryPath(segmentId); + String segmentId) { + String carbonDataDirectoryPath = CarbonTablePath.getSegmentPath( + factStoreLocation, segmentId); CarbonUtil.checkAndCreateFolder(carbonDataDirectoryPath); return carbonDataDirectoryPath; } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java index 7be61d9..c2f4501 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/CarbonLoaderUtil.java @@ -45,7 +45,6 @@ import org.apache.carbondata.core.fileoperations.FileWriteOperation; import org.apache.carbondata.core.locks.CarbonLockUtil; import org.apache.carbondata.core.locks.ICarbonLock; import org.apache.carbondata.core.metadata.AbsoluteTableIdentifier; -import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.metadata.ColumnIdentifier; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.metadata.schema.table.CarbonTable; @@ -54,7 +53,6 @@ import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.processing.loading.model.CarbonLoadModel; import org.apache.carbondata.processing.merger.NodeBlockRelation; @@ -73,11 +71,8 @@ public final class CarbonLoaderUtil { } public static void deleteSegment(CarbonLoadModel loadModel, int currentLoad) { - CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema().getCarbonTable(); - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier()); - - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath(currentLoad + ""); + String segmentPath = CarbonTablePath.getSegmentPath( + loadModel.getTablePath(), currentLoad + ""); deleteStorePath(segmentPath); } @@ -90,33 +85,26 @@ public final class CarbonLoaderUtil { */ public static boolean isValidSegment(CarbonLoadModel loadModel, int currentLoad) { - CarbonTable carbonTable = loadModel.getCarbonDataLoadSchema() - .getCarbonTable(); - CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath( - loadModel.getTablePath(), carbonTable.getCarbonTableIdentifier()); int fileCount = 0; - int partitionCount = carbonTable.getPartitionCount(); - for (int i = 0; i < partitionCount; i++) { - String segmentPath = carbonTablePath.getCarbonDataDirectoryPath( - currentLoad + ""); - CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath, - FileFactory.getFileType(segmentPath)); - CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() { - - @Override - public boolean accept(CarbonFile file) { - return file.getName().endsWith( - CarbonTablePath.getCarbonIndexExtension()) - || file.getName().endsWith( - CarbonTablePath.getCarbonDataExtension()); - } - - }); - fileCount += files.length; - if (files.length > 0) { - return true; + String segmentPath = CarbonTablePath.getSegmentPath( + loadModel.getTablePath(), currentLoad + ""); + CarbonFile carbonFile = FileFactory.getCarbonFile(segmentPath, + FileFactory.getFileType(segmentPath)); + CarbonFile[] files = carbonFile.listFiles(new CarbonFileFilter() { + + @Override + public boolean accept(CarbonFile file) { + return file.getName().endsWith( + CarbonTablePath.getCarbonIndexExtension()) + || file.getName().endsWith( + CarbonTablePath.getCarbonDataExtension()); } + + }); + fileCount += files.length; + if (files.length > 0) { + return true; } if (fileCount == 0) { return false; @@ -149,16 +137,15 @@ public final class CarbonLoaderUtil { CarbonLoadModel loadModel, boolean loadStartEntry, boolean insertOverwrite) throws IOException { boolean status = false; - AbsoluteTableIdentifier absoluteTableIdentifier = + AbsoluteTableIdentifier identifier = loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier(); - CarbonTablePath carbonTablePath = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); - String metadataPath = carbonTablePath.getMetadataDirectoryPath(); + String metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath()); FileType fileType = FileFactory.getFileType(metadataPath); if (!FileFactory.isFileExist(metadataPath, fileType)) { FileFactory.mkdirs(metadataPath, fileType); } - String tableStatusPath = carbonTablePath.getTableStatusFilePath(); - SegmentStatusManager segmentStatusManager = new SegmentStatusManager(absoluteTableIdentifier); + String tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()); + SegmentStatusManager segmentStatusManager = new SegmentStatusManager(identifier); ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); int retryCount = CarbonLockUtil .getLockProperty(CarbonCommonConstants.NUMBER_OF_TRIES_FOR_CONCURRENT_LOCK, @@ -172,7 +159,8 @@ public final class CarbonLoaderUtil { "Acquired lock for table" + loadModel.getDatabaseName() + "." + loadModel.getTableName() + " for table status updation"); LoadMetadataDetails[] listOfLoadFolderDetailsArray = - SegmentStatusManager.readLoadMetadata(carbonTablePath.getMetadataDirectoryPath()); + SegmentStatusManager.readLoadMetadata( + CarbonTablePath.getMetadataPath(identifier.getTablePath())); List<LoadMetadataDetails> listOfLoadFolderDetails = new ArrayList<>(CarbonCommonConstants.DEFAULT_COLLECTION_SIZE); List<CarbonFile> staleFolders = new ArrayList<>(); @@ -198,13 +186,13 @@ public final class CarbonLoaderUtil { // is triggered for (LoadMetadataDetails entry : listOfLoadFolderDetails) { if (entry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS - && segmentStatusManager.checkIfValidLoadInProgress( - absoluteTableIdentifier, entry.getLoadName())) { + && SegmentStatusManager.checkIfValidLoadInProgress( + identifier, entry.getLoadName())) { throw new RuntimeException("Already insert overwrite is in progress"); } else if (newMetaEntry.getSegmentStatus() == SegmentStatus.INSERT_OVERWRITE_IN_PROGRESS && entry.getSegmentStatus() == SegmentStatus.INSERT_IN_PROGRESS - && segmentStatusManager.checkIfValidLoadInProgress( - absoluteTableIdentifier, entry.getLoadName())) { + && SegmentStatusManager.checkIfValidLoadInProgress( + identifier, entry.getLoadName())) { throw new RuntimeException("Already insert into or load is in progress"); } } @@ -227,7 +215,7 @@ public final class CarbonLoaderUtil { entry.setSegmentStatus(SegmentStatus.MARKED_FOR_DELETE); // For insert overwrite, we will delete the old segment folder immediately // So collect the old segments here - addToStaleFolders(carbonTablePath, staleFolders, entry); + addToStaleFolders(identifier, staleFolders, entry); } } } @@ -236,7 +224,7 @@ public final class CarbonLoaderUtil { // when no records are inserted then newSegmentEntry will be SegmentStatus.MARKED_FOR_DELETE // so empty segment folder should be deleted if (newMetaEntry.getSegmentStatus() == SegmentStatus.MARKED_FOR_DELETE) { - addToStaleFolders(carbonTablePath, staleFolders, newMetaEntry); + addToStaleFolders(identifier, staleFolders, newMetaEntry); } SegmentStatusManager.writeLoadDetailsIntoFile(tableStatusPath, listOfLoadFolderDetails @@ -270,9 +258,10 @@ public final class CarbonLoaderUtil { return status; } - private static void addToStaleFolders(CarbonTablePath carbonTablePath, + private static void addToStaleFolders(AbsoluteTableIdentifier identifier, List<CarbonFile> staleFolders, LoadMetadataDetails entry) throws IOException { - String path = carbonTablePath.getCarbonDataDirectoryPath(entry.getLoadName()); + String path = CarbonTablePath.getSegmentPath( + identifier.getTablePath(), entry.getLoadName()); // add to the deletion list only if file exist else HDFS file system will throw // exception while deleting the file if file path does not exist if (FileFactory.isFileExist(path, FileFactory.getFileType(path))) { @@ -298,11 +287,9 @@ public final class CarbonLoaderUtil { loadMetadataDetails.setLoadStartTime(loadStartTime); } - public static void writeLoadMetadata(AbsoluteTableIdentifier absoluteTableIdentifier, + public static void writeLoadMetadata(AbsoluteTableIdentifier identifier, List<LoadMetadataDetails> listOfLoadFolderDetails) throws IOException { - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); - String dataLoadLocation = carbonTablePath.getTableStatusFilePath(); + String dataLoadLocation = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath()); DataOutputStream dataOutputStream; Gson gsonObjectToWrite = new Gson(); @@ -838,10 +825,8 @@ public final class CarbonLoaderUtil { * This method will get the store location for the given path, segment id and partition id */ public static void checkAndCreateCarbonDataLocation(String segmentId, CarbonTable carbonTable) { - CarbonTableIdentifier carbonTableIdentifier = carbonTable.getCarbonTableIdentifier(); - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath(carbonTable.getTablePath(), carbonTableIdentifier); - String segmentFolder = carbonTablePath.getCarbonDataDirectoryPath(segmentId); + String segmentFolder = CarbonTablePath.getSegmentPath( + carbonTable.getTablePath(), segmentId); CarbonUtil.checkAndCreateFolder(segmentFolder); } @@ -870,10 +855,8 @@ public final class CarbonLoaderUtil { */ public static Long addDataIndexSizeIntoMetaEntry(LoadMetadataDetails loadMetadataDetails, String segmentId, CarbonTable carbonTable) throws IOException { - CarbonTablePath carbonTablePath = - CarbonStorePath.getCarbonTablePath((carbonTable.getAbsoluteTableIdentifier())); Map<String, Long> dataIndexSize = - CarbonUtil.getDataSizeAndIndexSize(carbonTablePath, segmentId); + CarbonUtil.getDataSizeAndIndexSize(carbonTable.getTablePath(), segmentId); Long dataSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_DATA_SIZE); loadMetadataDetails.setDataSize(String.valueOf(dataSize)); Long indexSize = dataIndexSize.get(CarbonCommonConstants.CARBON_TOTAL_INDEX_SIZE); http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java index f9f3e20..1fdce32 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java +++ b/processing/src/main/java/org/apache/carbondata/processing/util/DeleteLoadFolders.java @@ -32,7 +32,6 @@ import org.apache.carbondata.core.mutate.CarbonUpdateUtil; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; public final class DeleteLoadFolders { @@ -47,15 +46,14 @@ public final class DeleteLoadFolders { /** * returns segment path * - * @param absoluteTableIdentifier + * @param identifier * @param oneLoad * @return */ - private static String getSegmentPath(AbsoluteTableIdentifier absoluteTableIdentifier, + private static String getSegmentPath(AbsoluteTableIdentifier identifier, LoadMetadataDetails oneLoad) { - CarbonTablePath carbon = CarbonStorePath.getCarbonTablePath(absoluteTableIdentifier); String segmentId = oneLoad.getLoadName(); - return carbon.getCarbonDataDirectoryPath(segmentId); + return CarbonTablePath.getSegmentPath(identifier.getTablePath(), segmentId); } public static void physicalFactAndMeasureMetadataDeletion( http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java index 7925b35..e059b35 100644 --- a/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java +++ b/processing/src/test/java/org/apache/carbondata/carbon/datastore/BlockIndexStoreTest.java @@ -73,12 +73,12 @@ public class BlockIndexStoreTest extends TestCase { // file.length(), ColumnarFormatVersion.V1, null); // CarbonTableIdentifier carbonTableIdentifier = // new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1"); -// AbsoluteTableIdentifier absoluteTableIdentifier = +// AbsoluteTableIdentifier identifier = // new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier); // try { // // List<TableBlockUniqueIdentifier> tableBlockInfoList = -// getTableBlockUniqueIdentifierList(Arrays.asList(new TableBlockInfo[] { info }), absoluteTableIdentifier); +// getTableBlockUniqueIdentifierList(Arrays.asList(new TableBlockInfo[] { info }), identifier); // List<AbstractIndex> loadAndGetBlocks = cache.getAll(tableBlockInfoList); // assertTrue(loadAndGetBlocks.size() == 1); // } catch (Exception e) { @@ -86,7 +86,7 @@ public class BlockIndexStoreTest extends TestCase { // } // List<String> segmentIds = new ArrayList<>(); // segmentIds.add(info.getSegmentId()); -// cache.removeTableBlocks(segmentIds, absoluteTableIdentifier); +// cache.removeTableBlocks(segmentIds, identifier); // } // private List<TableBlockUniqueIdentifier> getTableBlockUniqueIdentifierList(List<TableBlockInfo> tableBlockInfos, @@ -122,19 +122,19 @@ public class BlockIndexStoreTest extends TestCase { // // CarbonTableIdentifier carbonTableIdentifier = // new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1"); -// AbsoluteTableIdentifier absoluteTableIdentifier = +// AbsoluteTableIdentifier identifier = // new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier); // ExecutorService executor = Executors.newFixedThreadPool(3); // executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }), -// absoluteTableIdentifier)); +// identifier)); // executor.submit( // new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }), -// absoluteTableIdentifier)); +// identifier)); // executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }), -// absoluteTableIdentifier)); +// identifier)); // executor.submit( // new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }), -// absoluteTableIdentifier)); +// identifier)); // executor.shutdown(); // try { // executor.awaitTermination(1, TimeUnit.DAYS); @@ -145,7 +145,7 @@ public class BlockIndexStoreTest extends TestCase { // Arrays.asList(new TableBlockInfo[] { info, info1, info2, info3, info4 }); // try { // List<TableBlockUniqueIdentifier> tableBlockUniqueIdentifiers = -// getTableBlockUniqueIdentifierList(tableBlockInfos, absoluteTableIdentifier); +// getTableBlockUniqueIdentifierList(tableBlockInfos, identifier); // List<AbstractIndex> loadAndGetBlocks = cache.getAll(tableBlockUniqueIdentifiers); // assertTrue(loadAndGetBlocks.size() == 5); // } catch (Exception e) { @@ -155,7 +155,7 @@ public class BlockIndexStoreTest extends TestCase { // for (TableBlockInfo tableBlockInfo : tableBlockInfos) { // segmentIds.add(tableBlockInfo.getSegmentId()); // } -// cache.removeTableBlocks(segmentIds, absoluteTableIdentifier); +// cache.removeTableBlocks(segmentIds, identifier); // } // // public void testloadAndGetTaskIdToSegmentsMapForDifferentSegmentLoadedConcurrently() @@ -193,18 +193,18 @@ public class BlockIndexStoreTest extends TestCase { // // CarbonTableIdentifier carbonTableIdentifier = // new CarbonTableIdentifier(CarbonCommonConstants.DATABASE_DEFAULT_NAME, "t3", "1"); -// AbsoluteTableIdentifier absoluteTableIdentifier = +// AbsoluteTableIdentifier identifier = // new AbsoluteTableIdentifier("/src/test/resources", carbonTableIdentifier); // ExecutorService executor = Executors.newFixedThreadPool(3); // executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info, info1 }), -// absoluteTableIdentifier)); +// identifier)); // executor.submit( // new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info2, info3, info4 }), -// absoluteTableIdentifier)); +// identifier)); // executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info5, info6 }), -// absoluteTableIdentifier)); +// identifier)); // executor.submit(new BlockLoaderThread(Arrays.asList(new TableBlockInfo[] { info7 }), -// absoluteTableIdentifier)); +// identifier)); // // executor.shutdown(); // try { @@ -217,7 +217,7 @@ public class BlockIndexStoreTest extends TestCase { // .asList(new TableBlockInfo[] { info, info1, info2, info3, info4, info5, info6, info7 }); // try { // List<TableBlockUniqueIdentifier> blockUniqueIdentifierList = -// getTableBlockUniqueIdentifierList(tableBlockInfos, absoluteTableIdentifier); +// getTableBlockUniqueIdentifierList(tableBlockInfos, identifier); // List<AbstractIndex> loadAndGetBlocks = cache.getAll(blockUniqueIdentifierList); // assertTrue(loadAndGetBlocks.size() == 8); // } catch (Exception e) { @@ -227,7 +227,7 @@ public class BlockIndexStoreTest extends TestCase { // for (TableBlockInfo tableBlockInfo : tableBlockInfos) { // segmentIds.add(tableBlockInfo.getSegmentId()); // } -// cache.removeTableBlocks(segmentIds, absoluteTableIdentifier); +// cache.removeTableBlocks(segmentIds, identifier); // } private class BlockLoaderThread implements Callable<Void> { @@ -250,7 +250,7 @@ public class BlockIndexStoreTest extends TestCase { } private static File getPartFile() { - String path = StoreCreator.getAbsoluteTableIdentifier().getTablePath() + String path = StoreCreator.getIdentifier().getTablePath() + "/Fact/Part0/Segment_0"; File file = new File(path); File[] files = file.listFiles(); http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java ---------------------------------------------------------------------- diff --git a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java index 7f0aef6..d42dcde 100644 --- a/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java +++ b/processing/src/test/java/org/apache/carbondata/processing/StoreCreator.java @@ -64,7 +64,6 @@ import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.CarbonDictionaryWriter; import org.apache.carbondata.core.writer.CarbonDictionaryWriterImpl; @@ -98,14 +97,14 @@ import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; */ public class StoreCreator { - private static AbsoluteTableIdentifier absoluteTableIdentifier; + private static AbsoluteTableIdentifier identifier; private static String storePath = ""; static { try { storePath = new File("target/store").getCanonicalPath(); String dbName = "testdb"; String tableName = "testtable"; - absoluteTableIdentifier = + identifier = AbsoluteTableIdentifier.from( storePath + "/testdb/testtable", new CarbonTableIdentifier(dbName, tableName, UUID.randomUUID().toString())); @@ -114,8 +113,8 @@ public class StoreCreator { } } - public static AbsoluteTableIdentifier getAbsoluteTableIdentifier() { - return absoluteTableIdentifier; + public static AbsoluteTableIdentifier getIdentifier() { + return identifier; } /** @@ -134,12 +133,12 @@ public class StoreCreator { CarbonDataLoadSchema schema = new CarbonDataLoadSchema(table); CarbonLoadModel loadModel = new CarbonLoadModel(); loadModel.setCarbonDataLoadSchema(schema); - loadModel.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName()); - loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); - loadModel.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); + loadModel.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName()); + loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName()); + loadModel.setTableName(identifier.getCarbonTableIdentifier().getTableName()); loadModel.setFactFilePath(factFilePath); loadModel.setLoadMetadataDetails(new ArrayList<LoadMetadataDetails>()); - loadModel.setTablePath(absoluteTableIdentifier.getTablePath()); + loadModel.setTablePath(identifier.getTablePath()); loadModel.setDateFormat(null); loadModel.setDefaultTimestampFormat(CarbonProperties.getInstance().getProperty( CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT, @@ -175,9 +174,9 @@ public class StoreCreator { private static CarbonTable createTable() throws IOException { TableInfo tableInfo = new TableInfo(); - tableInfo.setDatabaseName(absoluteTableIdentifier.getCarbonTableIdentifier().getDatabaseName()); + tableInfo.setDatabaseName(identifier.getCarbonTableIdentifier().getDatabaseName()); TableSchema tableSchema = new TableSchema(); - tableSchema.setTableName(absoluteTableIdentifier.getCarbonTableIdentifier().getTableName()); + tableSchema.setTableName(identifier.getCarbonTableIdentifier().getTableName()); List<ColumnSchema> columnSchemas = new ArrayList<ColumnSchema>(); ArrayList<Encoding> encodings = new ArrayList<>(); encodings.add(Encoding.DICTIONARY); @@ -257,16 +256,13 @@ public class StoreCreator { tableSchema.setSchemaEvalution(schemaEvol); tableSchema.setTableId(UUID.randomUUID().toString()); tableInfo.setTableUniqueName( - absoluteTableIdentifier.getCarbonTableIdentifier().getTableUniqueName() + identifier.getCarbonTableIdentifier().getTableUniqueName() ); tableInfo.setLastUpdatedTime(System.currentTimeMillis()); tableInfo.setFactTable(tableSchema); - tableInfo.setTablePath(absoluteTableIdentifier.getTablePath()); + tableInfo.setTablePath(identifier.getTablePath()); - CarbonTablePath carbonTablePath = CarbonStorePath - .getCarbonTablePath(absoluteTableIdentifier.getTablePath(), - absoluteTableIdentifier.getCarbonTableIdentifier()); - String schemaFilePath = carbonTablePath.getSchemaFilePath(); + String schemaFilePath = CarbonTablePath.getSchemaFilePath(identifier.getTablePath()); String schemaMetadataPath = CarbonTablePath.getFolderContainingFile(schemaFilePath); CarbonMetadata.getInstance().loadTableMetadata(tableInfo); @@ -329,7 +325,7 @@ public class StoreCreator { writer.close(); writer.commit(); Dictionary dict = (Dictionary) dictCache.get( - new DictionaryColumnUniqueIdentifier(absoluteTableIdentifier, + new DictionaryColumnUniqueIdentifier(identifier, columnIdentifier, dims.get(i).getDataType())); CarbonDictionarySortInfoPreparator preparator = new CarbonDictionarySortInfoPreparator(); @@ -444,7 +440,7 @@ public class StoreCreator { loadMetadataDetails.setLoadStartTime(loadMetadataDetails.getTimeStamp(readCurrentTime())); listOfLoadFolderDetails.add(loadMetadataDetails); - String dataLoadLocation = schema.getCarbonTable().getMetaDataFilepath() + File.separator + String dataLoadLocation = schema.getCarbonTable().getMetadataPath() + File.separator + CarbonCommonConstants.LOADMETADATA_FILENAME; DataOutputStream dataOutputStream; http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java ---------------------------------------------------------------------- diff --git a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java index 7b823ac..8c9889d 100644 --- a/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java +++ b/streaming/src/main/java/org/apache/carbondata/streaming/segment/StreamSegment.java @@ -38,7 +38,6 @@ import org.apache.carbondata.core.statusmanager.FileFormat; import org.apache.carbondata.core.statusmanager.LoadMetadataDetails; import org.apache.carbondata.core.statusmanager.SegmentStatus; import org.apache.carbondata.core.statusmanager.SegmentStatusManager; -import org.apache.carbondata.core.util.path.CarbonStorePath; import org.apache.carbondata.core.util.path.CarbonTablePath; import org.apache.carbondata.core.writer.CarbonIndexFileWriter; import org.apache.carbondata.format.BlockIndex; @@ -60,8 +59,6 @@ public class StreamSegment { * get stream segment or create new stream segment if not exists */ public static String open(CarbonTable table) throws IOException { - CarbonTablePath tablePath = - CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier()); SegmentStatusManager segmentStatusManager = new SegmentStatusManager(table.getAbsoluteTableIdentifier()); ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); @@ -72,7 +69,8 @@ public class StreamSegment { + " for stream table get or create segment"); LoadMetadataDetails[] details = - SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath()); + SegmentStatusManager.readLoadMetadata( + CarbonTablePath.getMetadataPath(table.getTablePath())); LoadMetadataDetails streamSegment = null; for (LoadMetadataDetails detail : details) { if (FileFormat.ROW_V1 == detail.getFileFormat()) { @@ -97,8 +95,8 @@ public class StreamSegment { newDetails[i] = details[i]; } newDetails[i] = newDetail; - SegmentStatusManager - .writeLoadDetailsIntoFile(tablePath.getTableStatusFilePath(), newDetails); + SegmentStatusManager.writeLoadDetailsIntoFile( + CarbonTablePath.getTableStatusFilePath(table.getTablePath()), newDetails); return newDetail.getLoadName(); } else { return streamSegment.getLoadName(); @@ -126,8 +124,6 @@ public class StreamSegment { */ public static String close(CarbonTable table, String segmentId) throws IOException { - CarbonTablePath tablePath = - CarbonStorePath.getCarbonTablePath(table.getAbsoluteTableIdentifier()); SegmentStatusManager segmentStatusManager = new SegmentStatusManager(table.getAbsoluteTableIdentifier()); ICarbonLock carbonLock = segmentStatusManager.getTableStatusLock(); @@ -138,7 +134,8 @@ public class StreamSegment { + " for stream table finish segment"); LoadMetadataDetails[] details = - SegmentStatusManager.readLoadMetadata(tablePath.getMetadataDirectoryPath()); + SegmentStatusManager.readLoadMetadata( + CarbonTablePath.getMetadataPath(table.getTablePath())); for (LoadMetadataDetails detail : details) { if (segmentId.equals(detail.getLoadName())) { detail.setLoadEndTime(System.currentTimeMillis()); @@ -162,7 +159,8 @@ public class StreamSegment { } newDetails[i] = newDetail; SegmentStatusManager - .writeLoadDetailsIntoFile(tablePath.getTableStatusFilePath(), newDetails); + .writeLoadDetailsIntoFile(CarbonTablePath.getTableStatusFilePath( + table.getTablePath()), newDetails); return newDetail.getLoadName(); } else { LOGGER.error( @@ -192,7 +190,7 @@ public class StreamSegment { try { if (statusLock.lockWithRetries()) { LoadMetadataDetails[] details = - SegmentStatusManager.readLoadMetadata(carbonTable.getMetaDataFilepath()); + SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath()); boolean updated = false; for (LoadMetadataDetails detail : details) { if (SegmentStatus.STREAMING == detail.getSegmentStatus()) { @@ -202,10 +200,8 @@ public class StreamSegment { } } if (updated) { - CarbonTablePath tablePath = - CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier()); SegmentStatusManager.writeLoadDetailsIntoFile( - tablePath.getTableStatusFilePath(), + CarbonTablePath.getTableStatusFilePath(carbonTable.getTablePath()), details); } } else { http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala index 197cb14..186d100 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala @@ -37,7 +37,7 @@ import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.scan.result.iterator.RawResultIterator import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager} import org.apache.carbondata.core.util.CarbonUtil -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection} import org.apache.carbondata.hadoop.api.CarbonTableInputFormat import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader} @@ -216,7 +216,6 @@ object StreamHandoffRDD { ): Unit = { val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable val identifier = carbonTable.getAbsoluteTableIdentifier - val tablePath = CarbonStorePath.getCarbonTablePath(identifier) var continueHandoff = false // require handoff lock on table val lock = CarbonLockFactory.getCarbonLockObj(identifier, LockUsage.HANDOFF_LOCK) @@ -233,7 +232,7 @@ object StreamHandoffRDD { try { if (statusLock.lockWithRetries()) { loadMetadataDetails = SegmentStatusManager.readLoadMetadata( - tablePath.getMetadataDirectoryPath) + CarbonTablePath.getMetadataPath(identifier.getTablePath)) } } finally { if (null != statusLock) { @@ -355,19 +354,16 @@ object StreamHandoffRDD { loadModel: CarbonLoadModel ): Boolean = { var status = false - val metaDataFilepath = - loadModel.getCarbonDataLoadSchema().getCarbonTable().getMetaDataFilepath() - val identifier = - loadModel.getCarbonDataLoadSchema().getCarbonTable().getAbsoluteTableIdentifier() - val carbonTablePath = CarbonStorePath.getCarbonTablePath(identifier) - val metadataPath = carbonTablePath.getMetadataDirectoryPath() + val metaDataFilepath = loadModel.getCarbonDataLoadSchema.getCarbonTable.getMetadataPath + val identifier = loadModel.getCarbonDataLoadSchema.getCarbonTable.getAbsoluteTableIdentifier + val metadataPath = CarbonTablePath.getMetadataPath(identifier.getTablePath) val fileType = FileFactory.getFileType(metadataPath) if (!FileFactory.isFileExist(metadataPath, fileType)) { FileFactory.mkdirs(metadataPath, fileType) } - val tableStatusPath = carbonTablePath.getTableStatusFilePath() + val tableStatusPath = CarbonTablePath.getTableStatusFilePath(identifier.getTablePath) val segmentStatusManager = new SegmentStatusManager(identifier) - val carbonLock = segmentStatusManager.getTableStatusLock() + val carbonLock = segmentStatusManager.getTableStatusLock try { if (carbonLock.lockWithRetries()) { LOGGER.info( @@ -400,7 +396,7 @@ object StreamHandoffRDD { status = true } else { LOGGER.error("Not able to acquire the lock for Table status updation for table " + loadModel - .getDatabaseName() + "." + loadModel.getTableName()); + .getDatabaseName() + "." + loadModel.getTableName()) } } finally { if (carbonLock.unlock()) { @@ -411,6 +407,6 @@ object StreamHandoffRDD { "." + loadModel.getTableName() + " during table status updation") } } - return status + status } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala index f2274be..c417fbe 100644 --- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala +++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamSinkFactory.scala @@ -31,7 +31,7 @@ import org.apache.carbondata.core.dictionary.service.NonSecureDictionaryServiceP import org.apache.carbondata.core.metadata.encoder.Encoding import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostExecutionEvent, LoadTablePreExecutionEvent} import org.apache.carbondata.processing.loading.model.CarbonLoadModel @@ -126,16 +126,14 @@ object StreamSinkFactory { * @return */ private def getStreamSegmentId(carbonTable: CarbonTable): String = { - val carbonTablePath = CarbonStorePath - .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val fileType = FileFactory.getFileType(carbonTablePath.getMetadataDirectoryPath) - if (!FileFactory.isFileExist(carbonTablePath.getMetadataDirectoryPath, fileType)) { + val segmentId = StreamSegment.open(carbonTable) + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) + val fileType = FileFactory.getFileType(segmentDir) + if (!FileFactory.isFileExist(segmentDir, fileType)) { // Create table directory path, in case of enabling hive metastore first load may not have // table folder created. - FileFactory.mkdirs(carbonTablePath.getMetadataDirectoryPath, fileType) + FileFactory.mkdirs(segmentDir, fileType) } - val segmentId = StreamSegment.open(carbonTable) - val segmentDir = carbonTablePath.getSegmentDir(segmentId) if (FileFactory.isFileExist(segmentDir, fileType)) { // recover fault StreamSegment.recoverSegmentIfRequired(segmentDir) http://git-wip-us.apache.org/repos/asf/carbondata/blob/c3e99681/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala ---------------------------------------------------------------------- diff --git a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala index 45bc19a..ff483e5 100644 --- a/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala +++ b/streaming/src/main/scala/org/apache/spark/sql/execution/streaming/CarbonAppendableStreamSink.scala @@ -40,7 +40,7 @@ import org.apache.carbondata.core.dictionary.server.DictionaryServer import org.apache.carbondata.core.metadata.schema.table.CarbonTable import org.apache.carbondata.core.stats.QueryStatistic import org.apache.carbondata.core.util.CarbonProperties -import org.apache.carbondata.core.util.path.CarbonStorePath +import org.apache.carbondata.core.util.path.CarbonTablePath import org.apache.carbondata.events.{OperationContext, OperationListenerBus} import org.apache.carbondata.hadoop.streaming.CarbonStreamOutputFormat import org.apache.carbondata.hadoop.util.CarbonInputFormatUtil @@ -62,9 +62,7 @@ class CarbonAppendableStreamSink( carbonLoadModel: CarbonLoadModel, server: Option[DictionaryServer]) extends Sink { - private val carbonTablePath = CarbonStorePath - .getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - private val fileLogPath = carbonTablePath.getStreamingLogDir + private val fileLogPath = CarbonTablePath.getStreamingLogDir(carbonTable.getTablePath) private val fileLog = new FileStreamSinkLog(FileStreamSinkLog.VERSION, sparkSession, fileLogPath) // prepare configuration private val hadoopConf = { @@ -149,12 +147,12 @@ class CarbonAppendableStreamSink( * if the directory size of current segment beyond the threshold, hand off new segment */ private def checkOrHandOffSegment(): Unit = { - val segmentDir = carbonTablePath.getSegmentDir(currentSegmentId) + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId) val fileType = FileFactory.getFileType(segmentDir) if (segmentMaxSize <= StreamSegment.size(segmentDir)) { val newSegmentId = StreamSegment.close(carbonTable, currentSegmentId) currentSegmentId = newSegmentId - val newSegmentDir = carbonTablePath.getSegmentDir(currentSegmentId) + val newSegmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, currentSegmentId) FileFactory.mkdirs(newSegmentDir, fileType) // TODO trigger hand off operation @@ -250,15 +248,13 @@ object CarbonAppendableStreamSink { } // update data file info in index file - val tablePath = CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - StreamSegment.updateIndexFile(tablePath.getSegmentDir(segmentId)) + StreamSegment.updateIndexFile( + CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId)) } catch { // catch fault of executor side case t: Throwable => - val tablePath = - CarbonStorePath.getCarbonTablePath(carbonTable.getAbsoluteTableIdentifier) - val segmentDir = tablePath.getSegmentDir(segmentId) + val segmentDir = CarbonTablePath.getSegmentPath(carbonTable.getTablePath, segmentId) StreamSegment.recoverSegmentIfRequired(segmentDir) LOGGER.error(t, s"Aborting job ${ job.getJobID }.") committer.abortJob(job)