kunal642 commented on a change in pull request #4070:
URL: https://github.com/apache/carbondata/pull/4070#discussion_r568530696



##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
##########
@@ -294,6 +297,49 @@ case class CarbonAddLoadCommand(
       
OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, 
operationContext)
     }
 
+    val deltaFiles = FileFactory.getCarbonFile(segmentPath).listFiles()
+      .filter(_.getName.endsWith(CarbonCommonConstants.DELETE_DELTA_FILE_EXT))
+    if (deltaFiles.length > 0) {
+      val blockNameToDeltaFilesMap =
+        collection.mutable.Map[String, 
collection.mutable.ListBuffer[(CarbonFile, String)]]()
+      deltaFiles.foreach { deltaFile =>
+        val tmpDeltaFilePath = deltaFile.getAbsolutePath
+          .replace(CarbonCommonConstants.WINDOWS_FILE_SEPARATOR,
+            CarbonCommonConstants.FILE_SEPARATOR)
+        val deltaFilePathElements = 
tmpDeltaFilePath.split(CarbonCommonConstants.FILE_SEPARATOR)
+        if (deltaFilePathElements != null && deltaFilePathElements.nonEmpty) {
+          val deltaFileName = 
deltaFilePathElements(deltaFilePathElements.length - 1)
+          val blockName = CarbonTablePath.DataFileUtil
+            .getBlockNameFromDeleteDeltaFile(deltaFileName)
+          if (blockNameToDeltaFilesMap.contains(blockName)) {
+            blockNameToDeltaFilesMap(blockName) += ((deltaFile, deltaFileName))
+          } else {
+            val deltaFileList = new ListBuffer[(CarbonFile, String)]()
+            deltaFileList += ((deltaFile, deltaFileName))
+            blockNameToDeltaFilesMap.put(blockName, deltaFileList)
+          }
+        }
+      }
+      val segmentUpdateDetails = new util.ArrayList[SegmentUpdateDetails]()
+      val columnCompressor = 
CompressorFactory.getInstance.getCompressor.getName
+      blockNameToDeltaFilesMap.foreach { entry =>
+        val segmentUpdateDetail = new SegmentUpdateDetails()
+        segmentUpdateDetail.setBlockName(entry._1)
+        segmentUpdateDetail.setActualBlockName(
+          entry._1 + CarbonCommonConstants.POINT + columnCompressor +
+            CarbonCommonConstants.FACT_FILE_EXT)
+        segmentUpdateDetail.setSegmentName(model.getSegmentId)
+        setMinMaxDeltaStampAndDeletedRowCount(entry._2, segmentUpdateDetail)
+        segmentUpdateDetails.add(segmentUpdateDetail)
+      }
+      val timestamp = System.currentTimeMillis().toString
+      val segmentDetails = new util.HashSet[Segment]()
+      segmentDetails.add(model.getSegment)
+      CarbonUpdateUtil.updateSegmentStatus(segmentUpdateDetails, carbonTable, 
timestamp, false)

Review comment:
       can we pass a check like forcewrite in the updateSegmentStatus to avoid 
the validation of the segment from tablestaus file.. this flag would be true in 
addload command when delete delta is present. This way you can avoid writing 
twice.

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
##########
@@ -369,5 +426,64 @@ case class CarbonAddLoadCommand(
     }
   }
 
+  /**
+   * If there are more than one deleteDelta File present  for a block. Then 
This method
+   * will pick the deltaFile with highest timestamp, because the default 
threshold for horizontal
+   * compaction is 1. It is assumed that threshold for horizontal compaction 
is not changed from
+   * default value. So there will always be only one valid delete delta file 
present for a block.
+   * It also sets the number of deleted rows for a segment.
+   */
+  def setValidDeltaFileAndDeletedRowCount(
+      deleteDeltaFiles : ListBuffer[(CarbonFile, String)],
+      segmentUpdateDetails : SegmentUpdateDetails
+      ) : Unit = {
+    var maxDeltaStamp : Long = -1
+    var deletedRowsCount : Long = 0
+    var validDeltaFile : CarbonFile = null
+    deleteDeltaFiles.foreach { deltaFile =>
+      val currentFileTimestamp = CarbonTablePath.DataFileUtil
+        .getTimeStampFromDeleteDeltaFile(deltaFile._2)
+      if (currentFileTimestamp.toLong > maxDeltaStamp) {
+        maxDeltaStamp = currentFileTimestamp.toLong
+        validDeltaFile = deltaFile._1
+      }
+    }
+    val blockDetails =
+      new 
CarbonDeleteDeltaFileReaderImpl(validDeltaFile.getAbsolutePath).readJson()
+    blockDetails.getBlockletDetails.asScala.foreach { blocklet =>
+      deletedRowsCount = deletedRowsCount + blocklet.getDeletedRows.size()
+    }
+    segmentUpdateDetails.setDeleteDeltaStartTimestamp(maxDeltaStamp.toString)
+    segmentUpdateDetails.setDeleteDeltaEndTimestamp(maxDeltaStamp.toString)
+    segmentUpdateDetails.setDeletedRowsInBlock(deletedRowsCount.toString)
+  }
+
+  /**
+   * As horizontal compaction not supported for SDK segments. So all delta 
files are valid
+   */
+  def readAllDeltaFiles(
+      deleteDeltaFiles : ListBuffer[(CarbonFile, String)],
+      segmentUpdateDetails : SegmentUpdateDetails
+  ) : Unit = {

Review comment:
       please fix this formatting.. move to above line. Check other code for 
the same as well

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonAddLoadCommand.scala
##########
@@ -294,6 +297,49 @@ case class CarbonAddLoadCommand(
       
OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, 
operationContext)
     }
 
+    val deltaFiles = FileFactory.getCarbonFile(segmentPath).listFiles()

Review comment:
       Better to use CarbonFileFilter to list only the delete delta files




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to