vikramahuja1001 commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r509135611



##########
File path: 
core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import org.apache.log4j.Logger;
+
+public final class TrashUtil {
+
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  private TrashUtil() {
+
+  }
+
+  public static void copyDataToTrashFolder(String carbonTablePath, String 
pathOfFileToCopy,
+                                           String suffixToAdd) throws 
IOException {
+    String trashFolderPath = carbonTablePath + 
CarbonCommonConstants.FILE_SEPARATOR +
+            CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME + 
CarbonCommonConstants.FILE_SEPARATOR
+            + suffixToAdd;
+    try {
+      if (new File(pathOfFileToCopy).exists()) {
+        if (!FileFactory.isFileExist(trashFolderPath)) {
+          LOGGER.info("Creating Trash folder at:" + trashFolderPath);
+          FileFactory.createDirectoryAndSetPermission(trashFolderPath,
+                  new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.ALL));
+        }
+        FileUtils.copyFileToDirectory(new File(pathOfFileToCopy),

Review comment:
       using copy, because if anything crashes while moving files, cannot 
recover them. So, copying all the files of a segment and then deleting them 
after copying is success

##########
File path: 
processing/src/main/java/org/apache/carbondata/processing/loading/TableProcessingOperations.java
##########
@@ -152,6 +123,41 @@ public static void 
deletePartialLoadDataIfExist(CarbonTable carbonTable,
     }
   }
 
+  public static HashMap<CarbonFile, String> 
getStaleSegments(LoadMetadataDetails[] details,

Review comment:
       done

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonTruncateCommand.scala
##########
@@ -45,9 +45,11 @@ case class CarbonTruncateCommand(child: 
TruncateTableCommand) extends DataComman
       throw new MalformedCarbonCommandException(
         "Unsupported truncate table with specified partition")
     }
+    val optionList = List.empty[(String, String)]
+
     CarbonCleanFilesCommand(
       databaseNameOp = Option(dbName),
-      tableName = Option(tableName),
+      tableName = Option(tableName), Option(optionList),

Review comment:
       done

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/mutation/CarbonTruncateCommand.scala
##########
@@ -45,9 +45,11 @@ case class CarbonTruncateCommand(child: 
TruncateTableCommand) extends DataComman
       throw new MalformedCarbonCommandException(
         "Unsupported truncate table with specified partition")
     }
+    val optionList = List.empty[(String, String)]

Review comment:
       done

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonLoadDataCommand.scala
##########
@@ -108,7 +108,7 @@ case class CarbonLoadDataCommand(databaseNameOp: 
Option[String],
     // Delete stale segment folders that are not in table status but are 
physically present in
     // the Fact folder
     LOGGER.info(s"Deleting stale folders if present for table 
$dbName.$tableName")
-    TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
+    // TableProcessingOperations.deletePartialLoadDataIfExist(table, false)

Review comment:
       done

##########
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/execution/command/management/CarbonInsertIntoWithDf.scala
##########
@@ -90,7 +90,7 @@ case class CarbonInsertIntoWithDf(databaseNameOp: 
Option[String],
     // Delete stale segment folders that are not in table status but are 
physically present in
     // the Fact folder
     LOGGER.info(s"Deleting stale folders if present for table 
$dbName.$tableName")
-    TableProcessingOperations.deletePartialLoadDataIfExist(table, false)
+    // TableProcessingOperations.deletePartialLoadDataIfExist(table, false)

Review comment:
       done

##########
File path: 
core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.hadoop.fs.permission.FsAction;
+import org.apache.hadoop.fs.permission.FsPermission;
+
+import org.apache.log4j.Logger;
+
+public final class TrashUtil {
+
+  /**
+   * Attribute for Carbon LOGGER
+   */
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  private TrashUtil() {
+
+  }
+
+  public static void copyDataToTrashFolder(String carbonTablePath, String 
pathOfFileToCopy,
+                                           String suffixToAdd) throws 
IOException {
+    String trashFolderPath = carbonTablePath + 
CarbonCommonConstants.FILE_SEPARATOR +
+            CarbonCommonConstants.CARBON_TRASH_FOLDER_NAME + 
CarbonCommonConstants.FILE_SEPARATOR
+            + suffixToAdd;
+    try {
+      if (new File(pathOfFileToCopy).exists()) {
+        if (!FileFactory.isFileExist(trashFolderPath)) {

Review comment:
       done

##########
File path: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1136,16 +1169,29 @@ public static void deleteSegment(String tablePath, 
Segment segment,
    * If partition specs are null, then directly delete parent directory in 
locationMap.
    */
   private static void deletePhysicalPartition(List<PartitionSpec> 
partitionSpecs,
-      Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, 
String tablePath) {
+      Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, 
String tablePath,
+                                              String segmentNo, SegmentStatus 
segmentStatus)
+          throws IOException {
     for (String indexOrMergeFile : indexOrMergeFiles) {
       if (null != partitionSpecs) {
         Path location = new Path(indexOrMergeFile);
         boolean exists = pathExistsInPartitionSpec(partitionSpecs, location);
         if (!exists) {
+          // move to trash
+          TrashUtil.copyDataToTrashFolder(tablePath, location.toString(), 
CarbonCommonConstants
+                  .LOAD_FOLDER + segmentNo + 
CarbonCommonConstants.FILE_SEPARATOR +
+                  location.toString().substring(tablePath.length() + 1,
+                          location.toString().length()).split("/")[0]);

Review comment:
       done

##########
File path: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1136,16 +1169,29 @@ public static void deleteSegment(String tablePath, 
Segment segment,
    * If partition specs are null, then directly delete parent directory in 
locationMap.
    */
   private static void deletePhysicalPartition(List<PartitionSpec> 
partitionSpecs,
-      Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, 
String tablePath) {
+      Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, 
String tablePath,
+                                              String segmentNo, SegmentStatus 
segmentStatus)
+          throws IOException {
     for (String indexOrMergeFile : indexOrMergeFiles) {
       if (null != partitionSpecs) {
         Path location = new Path(indexOrMergeFile);
         boolean exists = pathExistsInPartitionSpec(partitionSpecs, location);
         if (!exists) {
+          // move to trash
+          TrashUtil.copyDataToTrashFolder(tablePath, location.toString(), 
CarbonCommonConstants
+                  .LOAD_FOLDER + segmentNo + 
CarbonCommonConstants.FILE_SEPARATOR +
+                  location.toString().substring(tablePath.length() + 1,
+                          location.toString().length()).split("/")[0]);

Review comment:
       done

##########
File path: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1106,23 +1107,55 @@ public static void cleanSegments(CarbonTable table, 
List<PartitionSpec> partitio
    */
   public static void deleteSegment(String tablePath, Segment segment,
       List<PartitionSpec> partitionSpecs,
-      SegmentUpdateStatusManager updateStatusManager) throws Exception {
+      SegmentUpdateStatusManager updateStatusManager, String tableName, String 
DatabaseName,
+                                   SegmentStatus segmentStatus, Boolean 
isPartitionTable)
+          throws Exception {
     SegmentFileStore fileStore = new SegmentFileStore(tablePath, 
segment.getSegmentFileName());
-    List<String> indexOrMergeFiles = 
fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
-        FileFactory.getConfiguration());
+    List<String> indexOrMergeFiles = 
fileStore.readIndexFiles(SegmentStatus.SUCCESS,
+            true, FileFactory.getConfiguration());
     Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
     for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+      // If the file to be deleted is a carbondata file, copy that file to the 
trash folder.

Review comment:
       done

##########
File path: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1106,23 +1107,55 @@ public static void cleanSegments(CarbonTable table, 
List<PartitionSpec> partitio
    */
   public static void deleteSegment(String tablePath, Segment segment,
       List<PartitionSpec> partitionSpecs,
-      SegmentUpdateStatusManager updateStatusManager) throws Exception {
+      SegmentUpdateStatusManager updateStatusManager, String tableName, String 
DatabaseName,
+                                   SegmentStatus segmentStatus, Boolean 
isPartitionTable)
+          throws Exception {
     SegmentFileStore fileStore = new SegmentFileStore(tablePath, 
segment.getSegmentFileName());
-    List<String> indexOrMergeFiles = 
fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
-        FileFactory.getConfiguration());
+    List<String> indexOrMergeFiles = 
fileStore.readIndexFiles(SegmentStatus.SUCCESS,
+            true, FileFactory.getConfiguration());
     Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
     for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+      // If the file to be deleted is a carbondata file, copy that file to the 
trash folder.
+      if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS) {
+        if (!isPartitionTable) {
+          TrashUtil.copyDataToTrashFolder(tablePath, entry.getKey(), 
CarbonCommonConstants
+                  .LOAD_FOLDER + segment.getSegmentNo());
+        } else {
+          TrashUtil.copyDataToTrashFolder(tablePath, entry.getKey(), 
CarbonCommonConstants
+                  .LOAD_FOLDER + segment.getSegmentNo() + 
CarbonCommonConstants.FILE_SEPARATOR +
+                  entry.getKey().substring(tablePath.length() + 1, 
entry.getKey().length()));

Review comment:
       changed the logic

##########
File path: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1106,23 +1107,55 @@ public static void cleanSegments(CarbonTable table, 
List<PartitionSpec> partitio
    */
   public static void deleteSegment(String tablePath, Segment segment,
       List<PartitionSpec> partitionSpecs,
-      SegmentUpdateStatusManager updateStatusManager) throws Exception {
+      SegmentUpdateStatusManager updateStatusManager, String tableName, String 
DatabaseName,
+                                   SegmentStatus segmentStatus, Boolean 
isPartitionTable)
+          throws Exception {
     SegmentFileStore fileStore = new SegmentFileStore(tablePath, 
segment.getSegmentFileName());
-    List<String> indexOrMergeFiles = 
fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
-        FileFactory.getConfiguration());
+    List<String> indexOrMergeFiles = 
fileStore.readIndexFiles(SegmentStatus.SUCCESS,
+            true, FileFactory.getConfiguration());
     Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
     for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+      // If the file to be deleted is a carbondata file, copy that file to the 
trash folder.
+      if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS) {
+        if (!isPartitionTable) {
+          TrashUtil.copyDataToTrashFolder(tablePath, entry.getKey(), 
CarbonCommonConstants
+                  .LOAD_FOLDER + segment.getSegmentNo());
+        } else {
+          TrashUtil.copyDataToTrashFolder(tablePath, entry.getKey(), 
CarbonCommonConstants
+                  .LOAD_FOLDER + segment.getSegmentNo() + 
CarbonCommonConstants.FILE_SEPARATOR +
+                  entry.getKey().substring(tablePath.length() + 1, 
entry.getKey().length()));
+        }
+      }
       FileFactory.deleteFile(entry.getKey());
       for (String file : entry.getValue()) {
         String[] deltaFilePaths =
             updateStatusManager.getDeleteDeltaFilePath(file, 
segment.getSegmentNo());
         for (String deltaFilePath : deltaFilePaths) {
+          // If the file to be deleted is a carbondata file, copy that file to 
the trash folder.

Review comment:
       done

##########
File path: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1106,23 +1107,55 @@ public static void cleanSegments(CarbonTable table, 
List<PartitionSpec> partitio
    */
   public static void deleteSegment(String tablePath, Segment segment,
       List<PartitionSpec> partitionSpecs,
-      SegmentUpdateStatusManager updateStatusManager) throws Exception {
+      SegmentUpdateStatusManager updateStatusManager, String tableName, String 
DatabaseName,
+                                   SegmentStatus segmentStatus, Boolean 
isPartitionTable)
+          throws Exception {
     SegmentFileStore fileStore = new SegmentFileStore(tablePath, 
segment.getSegmentFileName());
-    List<String> indexOrMergeFiles = 
fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
-        FileFactory.getConfiguration());
+    List<String> indexOrMergeFiles = 
fileStore.readIndexFiles(SegmentStatus.SUCCESS,
+            true, FileFactory.getConfiguration());
     Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
     for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
+      // If the file to be deleted is a carbondata file, copy that file to the 
trash folder.
+      if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS) {
+        if (!isPartitionTable) {
+          TrashUtil.copyDataToTrashFolder(tablePath, entry.getKey(), 
CarbonCommonConstants
+                  .LOAD_FOLDER + segment.getSegmentNo());
+        } else {
+          TrashUtil.copyDataToTrashFolder(tablePath, entry.getKey(), 
CarbonCommonConstants
+                  .LOAD_FOLDER + segment.getSegmentNo() + 
CarbonCommonConstants.FILE_SEPARATOR +
+                  entry.getKey().substring(tablePath.length() + 1, 
entry.getKey().length()));
+        }
+      }
       FileFactory.deleteFile(entry.getKey());
       for (String file : entry.getValue()) {
         String[] deltaFilePaths =
             updateStatusManager.getDeleteDeltaFilePath(file, 
segment.getSegmentNo());
         for (String deltaFilePath : deltaFilePaths) {
+          // If the file to be deleted is a carbondata file, copy that file to 
the trash folder.
+          if (segmentStatus == SegmentStatus
+                  .INSERT_IN_PROGRESS) {
+            TrashUtil.copyDataToTrashFolder(tablePath, deltaFilePath, 
deltaFilePath
+                    .substring(tablePath.length() + 1, 
deltaFilePath.length()));
+          }
           FileFactory.deleteFile(deltaFilePath);
         }
+        // If the file to be deleted is a carbondata file, copy that file to 
the trash folder.
+        if (file.endsWith(CarbonCommonConstants.FACT_FILE_EXT) && 
segmentStatus ==
+                SegmentStatus.INSERT_IN_PROGRESS) {

Review comment:
       it will always hit as the carbondata file is the map

##########
File path: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1057,7 +1058,7 @@ public static void cleanSegments(CarbonTable table, 
List<PartitionSpec> partitio
               partitionSpecs,
               fileStore.getIndexFilesMap(),
               indexOrMergeFiles,
-              table.getTablePath());
+              table.getTablePath(), segment.getLoadName(), 
segment.getSegmentStatus());

Review comment:
       done

##########
File path: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1136,16 +1169,29 @@ public static void deleteSegment(String tablePath, 
Segment segment,
    * If partition specs are null, then directly delete parent directory in 
locationMap.
    */
   private static void deletePhysicalPartition(List<PartitionSpec> 
partitionSpecs,
-      Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, 
String tablePath) {
+      Map<String, List<String>> locationMap, List<String> indexOrMergeFiles, 
String tablePath,
+                                              String segmentNo, SegmentStatus 
segmentStatus)

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
##########
@@ -3440,4 +3442,25 @@ public static void agingTempFolderForIndexServer(long 
agingTime)throws
       });
     }
   }
+
+  public static boolean tryGettingSegmentLock(LoadMetadataDetails oneLoad,
+                                              AbsoluteTableIdentifier 
absoluteTableIdentifier) {

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
##########
@@ -3440,4 +3442,25 @@ public static void agingTempFolderForIndexServer(long 
agingTime)throws
       });
     }
   }
+
+  public static boolean tryGettingSegmentLock(LoadMetadataDetails oneLoad,
+                                              AbsoluteTableIdentifier 
absoluteTableIdentifier) {
+    ICarbonLock segmentLock = 
CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
+            CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + 
LockUsage.LOCK);
+    boolean canBeDeleted = false;

Review comment:
       ok

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
##########
@@ -3440,4 +3442,25 @@ public static void agingTempFolderForIndexServer(long 
agingTime)throws
       });
     }
   }
+
+  public static boolean tryGettingSegmentLock(LoadMetadataDetails oneLoad,
+                                              AbsoluteTableIdentifier 
absoluteTableIdentifier) {
+    ICarbonLock segmentLock = 
CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
+            CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + 
LockUsage.LOCK);
+    boolean canBeDeleted = false;
+    try {
+      if (segmentLock.lockWithRetries(1, 5)) {

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
##########
@@ -3440,4 +3442,25 @@ public static void agingTempFolderForIndexServer(long 
agingTime)throws
       });
     }
   }
+
+  public static boolean tryGettingSegmentLock(LoadMetadataDetails oneLoad,
+                                              AbsoluteTableIdentifier 
absoluteTableIdentifier) {
+    ICarbonLock segmentLock = 
CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
+            CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + 
LockUsage.LOCK);
+    boolean canBeDeleted = false;
+    try {
+      if (segmentLock.lockWithRetries(1, 5)) {
+        LOGGER.info("Info: Acquired segment lock on segment:" + 
oneLoad.getLoadName() + "It " +

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
##########
@@ -3440,4 +3442,25 @@ public static void agingTempFolderForIndexServer(long 
agingTime)throws
       });
     }
   }
+
+  public static boolean tryGettingSegmentLock(LoadMetadataDetails oneLoad,
+                                              AbsoluteTableIdentifier 
absoluteTableIdentifier) {
+    ICarbonLock segmentLock = 
CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
+            CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + 
LockUsage.LOCK);
+    boolean canBeDeleted = false;
+    try {
+      if (segmentLock.lockWithRetries(1, 5)) {
+        LOGGER.info("Info: Acquired segment lock on segment:" + 
oneLoad.getLoadName() + "It " +
+                "can be deleted as load is not going on");

Review comment:
       done

##########
File path: core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
##########
@@ -3440,4 +3442,25 @@ public static void agingTempFolderForIndexServer(long 
agingTime)throws
       });
     }
   }
+
+  public static boolean tryGettingSegmentLock(LoadMetadataDetails oneLoad,
+                                              AbsoluteTableIdentifier 
absoluteTableIdentifier) {
+    ICarbonLock segmentLock = 
CarbonLockFactory.getCarbonLockObj(absoluteTableIdentifier,
+            CarbonTablePath.addSegmentPrefix(oneLoad.getLoadName()) + 
LockUsage.LOCK);
+    boolean canBeDeleted = false;
+    try {
+      if (segmentLock.lockWithRetries(1, 5)) {
+        LOGGER.info("Info: Acquired segment lock on segment:" + 
oneLoad.getLoadName() + "It " +
+                "can be deleted as load is not going on");
+        canBeDeleted = true;
+      } else {
+        LOGGER.info("Info: Load in progress for segment" + 
oneLoad.getLoadName());
+        canBeDeleted = false;
+      }
+    } finally {
+      segmentLock.unlock();
+      LOGGER.info("Info: Segment lock on segment:" + oneLoad.getLoadName() + " 
is released");

Review comment:
       done

##########
File path: 
core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
##########
@@ -138,8 +143,17 @@ public boolean accept(CarbonFile file) {
               if (filesToBeDeleted.length == 0) {
                 status = true;
               } else {
-
                 for (CarbonFile eachFile : filesToBeDeleted) {
+                  // If the file to be deleted is a carbondata file, copy that 
file to the trash
+                  // folder.
+                  if 
(eachFile.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT) &&
+                          oneLoad.getSegmentStatus() == 
SegmentStatus.INSERT_IN_PROGRESS) {
+                    TrashUtil.copyDataToTrashFolder(carbonTable.getTablePath(),
+                            eachFile.getAbsolutePath(), 
eachFile.getAbsolutePath()
+                                    
.substring(carbonTable.getTablePath().length() + 1,
+                                            
eachFile.getAbsolutePath().length()));

Review comment:
       changed logic

##########
File path: 
core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
##########
@@ -138,8 +143,17 @@ public boolean accept(CarbonFile file) {
               if (filesToBeDeleted.length == 0) {
                 status = true;
               } else {
-
                 for (CarbonFile eachFile : filesToBeDeleted) {
+                  // If the file to be deleted is a carbondata file, copy that 
file to the trash
+                  // folder.
+                  if 
(eachFile.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT) &&

Review comment:
       added code for that




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to