This is an automated email from the ASF dual-hosted git repository.

liuzhi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/carbondata.git


The following commit(s) were added to refs/heads/master by this push:
     new 1d316fc  [CARBONDATA-3877] Reduce read tablestatus overhead during 
inserting into partition table
1d316fc is described below

commit 1d316fc0ee3af9ecfda7208379051905b8460dde
Author: haomarch <marchp...@126.com>
AuthorDate: Sun Jun 28 19:21:19 2020 +0800

    [CARBONDATA-3877] Reduce read tablestatus overhead during inserting into 
partition table
    
    Why is this PR needed?
    Currently during inserting into a partition table, there are a lot of 
tablestauts read operations, but when storing table status file in object 
store, reading of table status file may fail (receive IOException or 
JsonSyntaxException) when table status file is being modifying, which leading 
to High failure rate when concurrent insert into a partition table.
    
    What changes were proposed in this PR?
    (1) Three codes was removed:calcute sizeinbytes, clean segments, 
deleteLoadsAndUpdateMetadata
    'calcute sizeinbytes' is useless during inserting into flow. 'clean 
segments' and 'deleteLoadsAndUpdateMetadata' are supported by 'clean files' 
command, which can be removed from inserting into flow.
    (2) Reduce duplicate tablestatus operations and limit the conditions to get 
tablestatus.
    
    Does this PR introduce any user interface change?
    No
    
    Is any new testcase added?
    No
    
    This closes #3800
---
 .../carbondata/core/metadata/SegmentFileStore.java  | 17 +++++++++++++++++
 .../hadoop/api/CarbonOutputCommitter.java           | 11 +++++++----
 .../org/apache/spark/rdd/CarbonMergeFilesRDD.scala  |  7 ++++---
 .../management/CarbonInsertIntoCommand.scala        |  2 --
 .../command/management/CommonLoadUtils.scala        |  5 +----
 .../datasources/SparkCarbonTableFormat.scala        | 21 ++++++++++++++++-----
 .../SILoadEventListenerForFailedSegments.scala      |  3 ++-
 .../loading/TableProcessingOperations.java          |  3 +--
 8 files changed, 48 insertions(+), 21 deletions(-)

diff --git 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
index 335e0f5..2f274c3 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
@@ -1149,6 +1149,23 @@ public class SegmentFileStore {
    * @throws IOException
    */
   public static List<PartitionSpec> getPartitionSpecs(String segmentId, String 
tablePath,
+      String segmentFilePath, String loadStartTime) throws IOException {
+    SegmentFileStore fileStore = new SegmentFileStore(tablePath, 
segmentFilePath);
+    List<PartitionSpec> partitionSpecs = fileStore.getPartitionSpecs();
+    for (PartitionSpec spec : partitionSpecs) {
+      spec.setUuid(segmentId + "_" + loadStartTime);
+    }
+    return partitionSpecs;
+  }
+
+  /**
+   * Get the partition specs of the segment
+   * @param segmentId
+   * @param tablePath
+   * @return
+   * @throws IOException
+   */
+  public static List<PartitionSpec> getPartitionSpecs(String segmentId, String 
tablePath,
       LoadMetadataDetails[] details)
       throws IOException {
     LoadMetadataDetails segEntry = null;
diff --git 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
index 02c8d4c..4b8fc43 100644
--- 
a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
+++ 
b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonOutputCommitter.java
@@ -232,10 +232,13 @@ public class CarbonOutputCommitter extends 
FileOutputCommitter {
     String segmentsToBeDeleted =
         
context.getConfiguration().get(CarbonTableOutputFormat.SEGMENTS_TO_BE_DELETED, 
"");
     List<Segment> segmentDeleteList = 
Segment.toSegmentList(segmentsToBeDeleted.split(","), null);
-    Set<Segment> segmentSet = new HashSet<>(
-        new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(),
-            
context.getConfiguration()).getValidAndInvalidSegments(carbonTable.isMV())
-            .getValidSegments());
+    Set<Segment> segmentSet = new HashSet<>();
+    if (updateTime != null || uniqueId != null) {
+      segmentSet = new HashSet<>(
+          new SegmentStatusManager(carbonTable.getAbsoluteTableIdentifier(),
+              
context.getConfiguration()).getValidAndInvalidSegments(carbonTable.isMV())
+                  .getValidSegments());
+    }
     if (updateTime != null) {
       CarbonUpdateUtil.updateTableMetadataStatus(segmentSet, carbonTable, 
updateTime, true,
           segmentDeleteList);
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
 
b/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
index ebac5e4..695ee27 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/rdd/CarbonMergeFilesRDD.scala
@@ -229,14 +229,15 @@ class CarbonMergeFilesRDD(
 
   override def internalGetPartitions: Array[Partition] = {
     if (isHivePartitionedTable) {
-      val metadataDetails = SegmentStatusManager
-        
.readLoadMetadata(CarbonTablePath.getMetadataPath(carbonTable.getTablePath))
       // in case of partition table make rdd partitions per partition of the 
carbon table
       val partitionPaths: java.util.Map[String, java.util.List[String]] = new 
java.util.HashMap()
       if (partitionInfo == null || partitionInfo.isEmpty) {
         segments.foreach(segment => {
+          val loadStartTime = segmentFileNameToSegmentIdMap.get(segment)
+          val segmentFileName = SegmentFileStore.genSegmentFileName(
+            segment, loadStartTime) + CarbonTablePath.SEGMENT_EXT
           val partitionSpecs = SegmentFileStore
-            .getPartitionSpecs(segment, carbonTable.getTablePath, 
metadataDetails)
+            .getPartitionSpecs(segment, carbonTable.getTablePath, 
segmentFileName, loadStartTime)
             .asScala.map(_.getLocation.toString)
           partitionPaths.put(segment, partitionSpecs.asJava)
         })
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
index 8dfad76..8c14917 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoCommand.scala
@@ -202,8 +202,6 @@ case class CarbonInsertIntoCommand(databaseNameOp: 
Option[String],
           updateModel = None,
           operationContext = operationContext)
 
-      // Clean up the old invalid segment data before creating a new entry for 
new load.
-      SegmentStatusManager.deleteLoadsAndUpdateMetadata(table, false, 
currPartitions)
       // add the start entry for the new load in the table status file
       if (!table.isHivePartitionTable) {
         CarbonLoaderUtil.readAndUpdateLoadProgressInTableMeta(
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
index 20d29d8..71649b8 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CommonLoadUtils.scala
@@ -126,7 +126,6 @@ object CommonLoadUtils {
             TableIdentifier(tableName, databaseNameOp))).collect {
           case l: LogicalRelation => l
         }.head
-      sizeInBytes = logicalPartitionRelation.relation.sizeInBytes
       finalPartition = getCompletePartitionValues(partition, table)
     }
     (sizeInBytes, table, dbName, logicalPartitionRelation, finalPartition)
@@ -843,8 +842,6 @@ object CommonLoadUtils {
   def loadDataWithPartition(loadParams: CarbonLoadParams): Seq[Row] = {
     val table = 
loadParams.carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
     val catalogTable: CatalogTable = 
loadParams.logicalPartitionRelation.catalogTable.get
-    // Clean up the already dropped partitioned data
-    SegmentFileStore.cleanSegments(table, null, false)
     CarbonUtils.threadSet("partition.operationcontext", 
loadParams.operationContext)
     val attributes = if (loadParams.scanResultRDD.isDefined) {
       // take the already re-arranged attributes
@@ -1102,7 +1099,7 @@ object CommonLoadUtils {
     val specs =
       
SegmentFileStore.getPartitionSpecs(loadParams.carbonLoadModel.getSegmentId,
         loadParams.carbonLoadModel.getTablePath,
-        
SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(table.getTablePath)))
+        loadParams.carbonLoadModel.getLoadMetadataDetails.asScala.toArray)
     if (specs != null) {
       specs.asScala.map { spec =>
         Row(spec.getPartitions.asScala.mkString("/"), 
spec.getLocation.toString, spec.getUuid)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
index f1f0b80..225daf5 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/execution/datasources/SparkCarbonTableFormat.scala
@@ -159,13 +159,16 @@ with Serializable {
     if (currEntry != null) {
       val loadEntry =
         
ObjectSerializationUtil.convertStringToObject(currEntry).asInstanceOf[LoadMetadataDetails]
-      val details =
-        
SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(model.getTablePath))
       model.setSegmentId(loadEntry.getLoadName)
       model.setFactTimeStamp(loadEntry.getLoadStartTime)
-      val list = new util.ArrayList[LoadMetadataDetails](details.toList.asJava)
-      list.add(loadEntry)
-      model.setLoadMetadataDetails(list)
+      if (!isLoadDetailsContainTheCurrentEntry(
+        model.getLoadMetadataDetails.asScala.toArray, loadEntry)) {
+        val details =
+          
SegmentStatusManager.readLoadMetadata(CarbonTablePath.getMetadataPath(model.getTablePath))
+        val list = new 
util.ArrayList[LoadMetadataDetails](details.toList.asJava)
+        list.add(loadEntry)
+        model.setLoadMetadataDetails(list)
+      }
     }
     // Set the update timestamp if user sets in case of update query. It needs 
to be updated
     // in load status update time
@@ -224,6 +227,14 @@ with Serializable {
     }
   }
   override def equals(other: Any): Boolean = 
other.isInstanceOf[SparkCarbonTableFormat]
+
+  private def isLoadDetailsContainTheCurrentEntry(
+      loadDetails: Array[LoadMetadataDetails],
+      currentEntry: LoadMetadataDetails): Boolean = {
+    (loadDetails.length - 1 to 0).exists { index =>
+      loadDetails(index).getLoadName.equals(currentEntry.getLoadName)
+    }
+  }
 }
 
 case class CarbonSQLHadoopMapReduceCommitProtocol(jobId: String, path: String, 
isAppend: Boolean)
diff --git 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
index 2e6a441..2071385 100644
--- 
a/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
+++ 
b/integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/SILoadEventListenerForFailedSegments.scala
@@ -64,7 +64,6 @@ class SILoadEventListenerForFailedSegments extends 
OperationEventListener with L
           .lookupRelation(Some(carbonLoadModel.getDatabaseName),
             
carbonLoadModel.getTableName)(sparkSession).asInstanceOf[CarbonRelation].carbonTable
         val indexMetadata = carbonTable.getIndexMetadata
-        val mainTableDetails = 
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
         val secondaryIndexProvider = IndexType.SI.getIndexProviderName
         if (null != indexMetadata && null != indexMetadata.getIndexesMap &&
             null != indexMetadata.getIndexesMap.get(secondaryIndexProvider)) {
@@ -72,6 +71,8 @@ class SILoadEventListenerForFailedSegments extends 
OperationEventListener with L
             .get(secondaryIndexProvider).keySet().asScala
           // if there are no index tables for a given fact table do not 
perform any action
           if (indexTables.nonEmpty) {
+            val mainTableDetails =
+              
SegmentStatusManager.readLoadMetadata(carbonTable.getMetadataPath)
             indexTables.foreach {
               indexTableName =>
                 val isLoadSIForFailedSegments = 
sparkSession.sessionState.catalog
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
index 358295d..f0f14d6 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
@@ -54,11 +54,10 @@ public class TableProcessingOperations {
   public static void deletePartialLoadDataIfExist(CarbonTable carbonTable,
       final boolean isCompactionFlow) throws IOException {
     String metaDataLocation = carbonTable.getMetadataPath();
-    final LoadMetadataDetails[] details = 
SegmentStatusManager.readLoadMetadata(metaDataLocation);
-
     //delete folder which metadata no exist in tablestatus
     String partitionPath = 
CarbonTablePath.getPartitionDir(carbonTable.getTablePath());
     if (FileFactory.isFileExist(partitionPath)) {
+      final LoadMetadataDetails[] details = 
SegmentStatusManager.readLoadMetadata(metaDataLocation);
       CarbonFile carbonFile = FileFactory.getCarbonFile(partitionPath);
       CarbonFile[] listFiles = carbonFile.listFiles(new CarbonFileFilter() {
         @Override

Reply via email to