ajantha-bhat commented on a change in pull request #3917:
URL: https://github.com/apache/carbondata/pull/3917#discussion_r510707186



##########
File path: 
core/src/main/java/org/apache/carbondata/core/metadata/SegmentFileStore.java
##########
@@ -1105,28 +1109,79 @@ public static void cleanSegments(CarbonTable table, 
List<PartitionSpec> partitio
    * @throws IOException
    */
   public static void deleteSegment(String tablePath, Segment segment,
-      List<PartitionSpec> partitionSpecs,
-      SegmentUpdateStatusManager updateStatusManager) throws Exception {
+      List<PartitionSpec> partitionSpecs, SegmentUpdateStatusManager 
updateStatusManager,
+      SegmentStatus segmentStatus, Boolean isPartitionTable, String timeStamp)
+      throws Exception {
     SegmentFileStore fileStore = new SegmentFileStore(tablePath, 
segment.getSegmentFileName());
     List<String> indexOrMergeFiles = 
fileStore.readIndexFiles(SegmentStatus.SUCCESS, true,
         FileFactory.getConfiguration());
+    List<String> filesToDelete = new ArrayList<>();
     Map<String, List<String>> indexFilesMap = fileStore.getIndexFilesMap();
     for (Map.Entry<String, List<String>> entry : indexFilesMap.entrySet()) {
-      FileFactory.deleteFile(entry.getKey());
+      // Move the file to the trash folder in case the segment status is 
insert in progress
+      if (segmentStatus == SegmentStatus.INSERT_IN_PROGRESS) {
+        if (!isPartitionTable) {
+          TrashUtil.copyDataToTrashFolderByFile(tablePath, entry.getKey(), 
timeStamp +

Review comment:
       why not copy whole segment ? why copying file by file.
   Multiple interactions to file system may become bottleneck for concurrent 
queries. Suggest to copy whole segment  once.
   

##########
File path: 
core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
##########
@@ -47,6 +47,7 @@
   public static final String BATCH_PREFIX = "_batchno";
   private static final String LOCK_DIR = "LockFiles";
 
+  public static final String SEGMENTS_FOLDER = "segments";

Review comment:
       ```suggestion
     public static final String SEGMENTS_METADATA_FOLDER = "segments";
   ```

##########
File path: 
core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.CarbonFileException;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.log4j.Logger;
+
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  /**
+   * The below method copies the complete a file to the trash folder. Provide 
necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that 
the proper folder is
+   * created in the trash folder.
+   */
+  public static void copyDataToTrashFolderByFile(String carbonTablePath, 
String pathOfFileToCopy,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolder(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      if (new File(pathOfFileToCopy).exists()) {
+        FileUtils.copyFileToDirectory(new File(pathOfFileToCopy), new 
File(trashFolderPath));
+        LOGGER.info("File: " + pathOfFileToCopy + " successfully copied to the 
trash folder: "
+                + trashFolderPath);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Unable to copy " + pathOfFileToCopy + " to the trash 
folder", e);
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. 
Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that 
the proper folder is
+   * created in the trash folder.
+   */
+  public static void copyDataToTrashBySegment(CarbonFile path, String 
carbonTablePath,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolder(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      FileUtils.copyDirectory(new File(path.getAbsolutePath()), new 
File(trashFolderPath));
+      LOGGER.info("Segment: " + path.getAbsolutePath() + " has been copied to 
the trash folder" +
+          " successfully");
+    } catch (IOException e) {
+      LOGGER.error("Unable to create the trash folder and copy data to it", e);
+    }
+  }
+
+  /**
+   * The below method deletes timestamp subdirectories in the trash folder 
which have expired as
+   * per the user defined expiration time
+   */
+  public static void deleteAllDataFromTrashFolderByTimeStamp(String 
carbonTablePath, Long timeStamp)
+          throws IOException {
+    String pathOfTrashFolder = CarbonTablePath.getTrashFolder(carbonTablePath);
+    // Deleting the timestamp based subdirectories in the trashfolder by the 
given timestamp.
+    if (FileFactory.isFileExist(pathOfTrashFolder)) {
+      try {
+        List<CarbonFile> carbonFileList = 
FileFactory.getFolderList(pathOfTrashFolder);
+        for (CarbonFile carbonFile : carbonFileList) {
+          String[] aB = 
carbonFile.getAbsolutePath().split(CarbonCommonConstants.FILE_SEPARATOR);
+          Long currentTime = Long.valueOf(new 
Timestamp(System.currentTimeMillis()).getTime());
+          Long givenTime = Long.valueOf(aB[aB.length - 1]);
+          // If the timeStamp at which the timeStamp subdirectory has expired 
as per the user
+          // defined value, delete the complete timeStamp subdirectory
+          if (givenTime + timeStamp < currentTime) {
+            deleteDataFromTrashFolderByFile(carbonFile);
+          }

Review comment:
       add log that nothing to delete as the files are not expired 

##########
File path: 
core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.CarbonFileException;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.log4j.Logger;
+
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  /**
+   * The below method copies the complete a file to the trash folder. Provide 
necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that 
the proper folder is
+   * created in the trash folder.
+   */
+  public static void copyDataToTrashFolderByFile(String carbonTablePath, 
String pathOfFileToCopy,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolder(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      if (new File(pathOfFileToCopy).exists()) {
+        FileUtils.copyFileToDirectory(new File(pathOfFileToCopy), new 
File(trashFolderPath));
+        LOGGER.info("File: " + pathOfFileToCopy + " successfully copied to the 
trash folder: "
+                + trashFolderPath);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Unable to copy " + pathOfFileToCopy + " to the trash 
folder", e);
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. 
Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that 
the proper folder is
+   * created in the trash folder.
+   */
+  public static void copyDataToTrashBySegment(CarbonFile path, String 
carbonTablePath,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolder(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      FileUtils.copyDirectory(new File(path.getAbsolutePath()), new 
File(trashFolderPath));
+      LOGGER.info("Segment: " + path.getAbsolutePath() + " has been copied to 
the trash folder" +
+          " successfully");
+    } catch (IOException e) {
+      LOGGER.error("Unable to create the trash folder and copy data to it", e);
+    }
+  }
+
+  /**
+   * The below method deletes timestamp subdirectories in the trash folder 
which have expired as
+   * per the user defined expiration time
+   */
+  public static void deleteAllDataFromTrashFolderByTimeStamp(String 
carbonTablePath, Long timeStamp)
+          throws IOException {
+    String pathOfTrashFolder = CarbonTablePath.getTrashFolder(carbonTablePath);
+    // Deleting the timestamp based subdirectories in the trashfolder by the 
given timestamp.
+    if (FileFactory.isFileExist(pathOfTrashFolder)) {
+      try {
+        List<CarbonFile> carbonFileList = 
FileFactory.getFolderList(pathOfTrashFolder);
+        for (CarbonFile carbonFile : carbonFileList) {
+          String[] aB = 
carbonFile.getAbsolutePath().split(CarbonCommonConstants.FILE_SEPARATOR);

Review comment:
       getting substring is better instead of splitting it ?

##########
File path: 
core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.CarbonFileException;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.log4j.Logger;
+
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  /**
+   * The below method copies the complete a file to the trash folder. Provide 
necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that 
the proper folder is
+   * created in the trash folder.
+   */
+  public static void copyDataToTrashFolderByFile(String carbonTablePath, 
String pathOfFileToCopy,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolder(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      if (new File(pathOfFileToCopy).exists()) {
+        FileUtils.copyFileToDirectory(new File(pathOfFileToCopy), new 
File(trashFolderPath));
+        LOGGER.info("File: " + pathOfFileToCopy + " successfully copied to the 
trash folder: "
+                + trashFolderPath);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Unable to copy " + pathOfFileToCopy + " to the trash 
folder", e);
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. 
Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that 
the proper folder is
+   * created in the trash folder.
+   */
+  public static void copyDataToTrashBySegment(CarbonFile path, String 
carbonTablePath,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolder(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      FileUtils.copyDirectory(new File(path.getAbsolutePath()), new 
File(trashFolderPath));
+      LOGGER.info("Segment: " + path.getAbsolutePath() + " has been copied to 
the trash folder" +
+          " successfully");
+    } catch (IOException e) {
+      LOGGER.error("Unable to create the trash folder and copy data to it", e);
+    }
+  }
+
+  /**
+   * The below method deletes timestamp subdirectories in the trash folder 
which have expired as
+   * per the user defined expiration time
+   */
+  public static void deleteAllDataFromTrashFolderByTimeStamp(String 
carbonTablePath, Long timeStamp)
+          throws IOException {
+    String pathOfTrashFolder = CarbonTablePath.getTrashFolder(carbonTablePath);
+    // Deleting the timestamp based subdirectories in the trashfolder by the 
given timestamp.
+    if (FileFactory.isFileExist(pathOfTrashFolder)) {
+      try {
+        List<CarbonFile> carbonFileList = 
FileFactory.getFolderList(pathOfTrashFolder);
+        for (CarbonFile carbonFile : carbonFileList) {
+          String[] aB = 
carbonFile.getAbsolutePath().split(CarbonCommonConstants.FILE_SEPARATOR);

Review comment:
       also use better name for aB

##########
File path: 
core/src/main/java/org/apache/carbondata/core/constants/CarbonCommonConstants.java
##########
@@ -1427,6 +1428,25 @@ private CarbonCommonConstants() {
 
   public static final String BITSET_PIPE_LINE_DEFAULT = "true";
 
+  public static final long MILLIS_SECONDS_IN_A_DAY = TimeUnit.DAYS.toMillis(1);

Review comment:
       As it is used in only one place and TimeUnit.DAYS.toMillis(1) is very 
readable. I suggest no need to define a constant for it. Just use this directly.

##########
File path: 
core/src/main/java/org/apache/carbondata/core/statusmanager/SegmentStatusManager.java
##########
@@ -1049,7 +1049,7 @@ private static ReturnTuple isUpdateRequired(boolean 
isForceDeletion, CarbonTable
   }
 
   public static void deleteLoadsAndUpdateMetadata(CarbonTable carbonTable, 
boolean isForceDeletion,
-      List<PartitionSpec> partitionSpecs) throws IOException {
+      List<PartitionSpec> partitionSpecs, String timeStamp) throws IOException 
{

Review comment:
       while moving to trash itself (at the beginning of the function) can get 
current time right ? I feel no need to change all method signatures just for 
this.

##########
File path: docs/dml-of-carbondata.md
##########
@@ -562,3 +563,50 @@ CarbonData DML statements are documented here,which 
includes:
   ```
   CLEAN FILES FOR TABLE carbon_table
   ```
+
+## CLEAN FILES
+
+  Clean files command is used to remove the Compacted and Marked

Review comment:
       just give link to * [CLEAN FILES](./cleanfiles.md) here also
   

##########
File path: 
core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
##########
@@ -2116,6 +2086,20 @@ public int getMaxSIRepairLimit(String dbName, String 
tableName) {
     return Math.abs(Integer.parseInt(thresholdValue));
   }
 
+  /**
+   * The below method returns the microseconds after which the trash folder 
will expire
+   */
+  public long getTrashFolderExpirationTime() {
+    String configuredValue = 
getProperty(CarbonCommonConstants.TRASH_EXPIRATION_DAYS,
+            CarbonCommonConstants.TRASH_EXPIRATION_DAYS_DEFAULT);
+    int result = Integer.parseInt(configuredValue);
+    if (result < 0) {
+      result = Integer.parseInt(TRASH_EXPIRATION_DAYS_DEFAULT);

Review comment:
       add a warning log

##########
File path: 
core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.CarbonFileException;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.log4j.Logger;
+
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  /**
+   * The below method copies the complete a file to the trash folder. Provide 
necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that 
the proper folder is
+   * created in the trash folder.
+   */
+  public static void copyDataToTrashFolderByFile(String carbonTablePath, 
String pathOfFileToCopy,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolder(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      if (new File(pathOfFileToCopy).exists()) {
+        FileUtils.copyFileToDirectory(new File(pathOfFileToCopy), new 
File(trashFolderPath));
+        LOGGER.info("File: " + pathOfFileToCopy + " successfully copied to the 
trash folder: "
+                + trashFolderPath);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Unable to copy " + pathOfFileToCopy + " to the trash 
folder", e);
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. 
Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that 
the proper folder is
+   * created in the trash folder.
+   */
+  public static void copyDataToTrashBySegment(CarbonFile path, String 
carbonTablePath,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolder(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      FileUtils.copyDirectory(new File(path.getAbsolutePath()), new 
File(trashFolderPath));
+      LOGGER.info("Segment: " + path.getAbsolutePath() + " has been copied to 
the trash folder" +
+          " successfully");
+    } catch (IOException e) {
+      LOGGER.error("Unable to create the trash folder and copy data to it", e);
+    }
+  }
+
+  /**
+   * The below method deletes timestamp subdirectories in the trash folder 
which have expired as
+   * per the user defined expiration time
+   */
+  public static void deleteAllDataFromTrashFolderByTimeStamp(String 
carbonTablePath, Long timeStamp)
+          throws IOException {
+    String pathOfTrashFolder = CarbonTablePath.getTrashFolder(carbonTablePath);
+    // Deleting the timestamp based subdirectories in the trashfolder by the 
given timestamp.
+    if (FileFactory.isFileExist(pathOfTrashFolder)) {
+      try {
+        List<CarbonFile> carbonFileList = 
FileFactory.getFolderList(pathOfTrashFolder);
+        for (CarbonFile carbonFile : carbonFileList) {
+          String[] aB = 
carbonFile.getAbsolutePath().split(CarbonCommonConstants.FILE_SEPARATOR);
+          Long currentTime = Long.valueOf(new 
Timestamp(System.currentTimeMillis()).getTime());
+          Long givenTime = Long.valueOf(aB[aB.length - 1]);
+          // If the timeStamp at which the timeStamp subdirectory has expired 
as per the user
+          // defined value, delete the complete timeStamp subdirectory
+          if (givenTime + timeStamp < currentTime) {
+            deleteDataFromTrashFolderByFile(carbonFile);
+          }
+        }
+      } catch (IOException e) {
+        LOGGER.error("Error during deleting from trash folder", e);
+      }
+    }
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folder of 
a carbon table.
+   */
+  public static void deleteAllDataFromTrashFolder(String carbonTablePath)
+          throws IOException {
+    String pathOfTrashFolder = CarbonTablePath.getTrashFolder(carbonTablePath);
+    // if the trash folder exists delete the contents of the trash folder, if 
it does not exists
+    // create a trash folder

Review comment:
       update the comments

##########
File path: 
core/src/main/java/org/apache/carbondata/core/util/path/TrashUtil.java
##########
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.util.path;
+
+import java.io.File;
+import java.io.IOException;
+import java.sql.Timestamp;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.exception.CarbonFileException;
+import org.apache.carbondata.core.util.CarbonUtil;
+
+import org.apache.commons.io.FileUtils;
+
+import org.apache.log4j.Logger;
+
+public final class TrashUtil {
+
+  private static final Logger LOGGER =
+          LogServiceFactory.getLogService(CarbonUtil.class.getName());
+
+  /**
+   * The below method copies the complete a file to the trash folder. Provide 
necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that 
the proper folder is
+   * created in the trash folder.
+   */
+  public static void copyDataToTrashFolderByFile(String carbonTablePath, 
String pathOfFileToCopy,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolder(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      if (new File(pathOfFileToCopy).exists()) {
+        FileUtils.copyFileToDirectory(new File(pathOfFileToCopy), new 
File(trashFolderPath));
+        LOGGER.info("File: " + pathOfFileToCopy + " successfully copied to the 
trash folder: "
+                + trashFolderPath);
+      }
+    } catch (IOException e) {
+      LOGGER.error("Unable to copy " + pathOfFileToCopy + " to the trash 
folder", e);
+    }
+  }
+
+  /**
+   * The below method copies the complete segment folder to the trash folder. 
Provide necessary
+   * timestamp and the segment number in the suffixToAdd  variable, so that 
the proper folder is
+   * created in the trash folder.
+   */
+  public static void copyDataToTrashBySegment(CarbonFile path, String 
carbonTablePath,
+      String suffixToAdd) {
+    String trashFolderPath = CarbonTablePath.getTrashFolder(carbonTablePath) +
+        CarbonCommonConstants.FILE_SEPARATOR + suffixToAdd;
+    try {
+      FileUtils.copyDirectory(new File(path.getAbsolutePath()), new 
File(trashFolderPath));
+      LOGGER.info("Segment: " + path.getAbsolutePath() + " has been copied to 
the trash folder" +
+          " successfully");
+    } catch (IOException e) {
+      LOGGER.error("Unable to create the trash folder and copy data to it", e);
+    }
+  }
+
+  /**
+   * The below method deletes timestamp subdirectories in the trash folder 
which have expired as
+   * per the user defined expiration time
+   */
+  public static void deleteAllDataFromTrashFolderByTimeStamp(String 
carbonTablePath, Long timeStamp)
+          throws IOException {
+    String pathOfTrashFolder = CarbonTablePath.getTrashFolder(carbonTablePath);
+    // Deleting the timestamp based subdirectories in the trashfolder by the 
given timestamp.
+    if (FileFactory.isFileExist(pathOfTrashFolder)) {
+      try {
+        List<CarbonFile> carbonFileList = 
FileFactory.getFolderList(pathOfTrashFolder);
+        for (CarbonFile carbonFile : carbonFileList) {
+          String[] aB = 
carbonFile.getAbsolutePath().split(CarbonCommonConstants.FILE_SEPARATOR);
+          Long currentTime = Long.valueOf(new 
Timestamp(System.currentTimeMillis()).getTime());
+          Long givenTime = Long.valueOf(aB[aB.length - 1]);
+          // If the timeStamp at which the timeStamp subdirectory has expired 
as per the user
+          // defined value, delete the complete timeStamp subdirectory
+          if (givenTime + timeStamp < currentTime) {
+            deleteDataFromTrashFolderByFile(carbonFile);
+          }
+        }
+      } catch (IOException e) {
+        LOGGER.error("Error during deleting from trash folder", e);
+      }
+    }
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folder of 
a carbon table.
+   */
+  public static void deleteAllDataFromTrashFolder(String carbonTablePath)
+          throws IOException {
+    String pathOfTrashFolder = CarbonTablePath.getTrashFolder(carbonTablePath);
+    // if the trash folder exists delete the contents of the trash folder, if 
it does not exists
+    // create a trash folder
+    if (FileFactory.isFileExist(pathOfTrashFolder)) {
+      try {
+        List<CarbonFile> carbonFileList = 
FileFactory.getFolderList(pathOfTrashFolder);
+        for (CarbonFile carbonFile : carbonFileList) {
+          deleteDataFromTrashFolderByFile(carbonFile);
+        }
+      } catch (IOException e) {
+        LOGGER.error("Error during deleting from trash folder", e);
+      }
+    }
+  }
+
+  /**
+   * The below method deletes a specific file in the trash folder.
+   */
+  private static void deleteDataFromTrashFolderByFile(CarbonFile carbonFile) {
+    try {
+      FileFactory.deleteAllCarbonFilesOfDir(carbonFile);

Review comment:
       it is not s specific file, whole folder I guess. update the comments and 
method header

##########
File path: docs/dml-of-carbondata.md
##########
@@ -562,3 +563,50 @@ CarbonData DML statements are documented here,which 
includes:
   ```
   CLEAN FILES FOR TABLE carbon_table
   ```
+
+## CLEAN FILES
+
+  Clean files command is used to remove the Compacted and Marked
+  For Delete Segments from the store. Carbondata also supports Trash
+  Folder where all the stale data is moved to after clean files
+  is called
+
+  There are several types of compaction
+
+  ```
+  CLEAN FILES ON TABLE TableName
+  ```
+
+  - **Minor Compaction**

Review comment:
       explaining what is compaction inside cleanfiles section is not good. 
This should b there in compaction section

##########
File path: 
core/src/main/java/org/apache/carbondata/core/util/DeleteLoadFolders.java
##########
@@ -138,8 +143,19 @@ public boolean accept(CarbonFile file) {
               if (filesToBeDeleted.length == 0) {
                 status = true;
               } else {
-
                 for (CarbonFile eachFile : filesToBeDeleted) {
+                  // If the file to be deleted is a carbondata file, index 
file, index merge file
+                  // or a delta file, copy that file to the trash folder.
+                  if 
((eachFile.getName().endsWith(CarbonCommonConstants.FACT_FILE_EXT) ||

Review comment:
       same comment as above, copy segment at once.

##########
File path: 
integration/spark/src/main/scala/org/apache/carbondata/cleanfiles/CleanFilesUtil.scala
##########
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.cleanfiles
+
+import java.util
+
+import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
+
+import org.apache.spark.sql.{AnalysisException, CarbonEnv, Row, SparkSession}
+import org.apache.spark.sql.index.CarbonIndexUtil
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.CarbonFile
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.exception.ConcurrentOperationException
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockFactory, CarbonLockUtil, 
ICarbonLock, LockUsage}
+import org.apache.carbondata.core.metadata.{AbsoluteTableIdentifier, 
CarbonMetadata, SegmentFileStore}
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.mutate.CarbonUpdateUtil
+import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus, SegmentStatusManager}
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil}
+import org.apache.carbondata.core.util.path.{CarbonTablePath, TrashUtil}
+import org.apache.carbondata.processing.loading.TableProcessingOperations
+import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+
+object CleanFilesUtil {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * The method deletes all data if forceTableClean <true> and clean garbage 
segment
+   * (MARKED_FOR_DELETE state) if forceTableClean <false>
+   *
+   * @param dbName                 : Database name
+   * @param tableName              : Table name
+   * @param tablePath              : Table path
+   * @param carbonTable            : CarbonTable Object <null> in case of 
force clean
+   * @param forceTableClean        : <true> for force clean it will delete all 
data
+   *                               <false> it will clean garbage segment 
(MARKED_FOR_DELETE state)
+   * @param currentTablePartitions : Hive Partitions  details
+   */
+  def cleanFiles(
+    dbName: String,
+    tableName: String,
+    tablePath: String,
+    timeStamp: String,
+    carbonTable: CarbonTable,
+    forceTableClean: Boolean,
+    currentTablePartitions: Option[Seq[PartitionSpec]] = None,
+    truncateTable: Boolean = false): Unit = {
+    var carbonCleanFilesLock: ICarbonLock = null
+    val absoluteTableIdentifier = if (forceTableClean) {
+      AbsoluteTableIdentifier.from(tablePath, dbName, tableName, tableName)
+    } else {
+      carbonTable.getAbsoluteTableIdentifier
+    }
+    try {
+      val errorMsg = "Clean files request is failed for " +
+        s"$dbName.$tableName" +
+        ". Not able to acquire the clean files lock due to another clean files 
" +
+        "operation is running in the background."
+      // in case of force clean the lock is not required
+      if (forceTableClean) {
+        FileFactory.deleteAllCarbonFilesOfDir(
+          FileFactory.getCarbonFile(absoluteTableIdentifier.getTablePath))
+      } else {
+        carbonCleanFilesLock =
+          CarbonLockUtil
+            .getLockObject(absoluteTableIdentifier, 
LockUsage.CLEAN_FILES_LOCK, errorMsg)
+        if (truncateTable) {
+          SegmentStatusManager.truncateTable(carbonTable)
+        }
+        SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+          carbonTable, true, currentTablePartitions.map(_.asJava).orNull, 
timeStamp)
+        CarbonUpdateUtil.cleanUpDeltaFiles(carbonTable, true)
+        currentTablePartitions match {
+          case Some(partitions) =>
+            SegmentFileStore.cleanSegments(
+              carbonTable,
+              currentTablePartitions.map(_.asJava).orNull,
+              timeStamp,
+              true)
+          case _ =>
+        }
+      }
+    } finally {
+      if (currentTablePartitions.equals(None)) {
+        cleanUpPartitionFoldersRecursively(carbonTable, 
List.empty[PartitionSpec])
+      } else {
+        cleanUpPartitionFoldersRecursively(carbonTable, 
currentTablePartitions.get.toList)
+      }
+
+      if (carbonCleanFilesLock != null) {
+        CarbonLockUtil.fileUnlock(carbonCleanFilesLock, 
LockUsage.CLEAN_FILES_LOCK)
+      }
+    }
+  }
+
+
+  /**
+   * delete partition folders recursively
+   *
+   * @param carbonTable
+   * @param partitionSpecList
+   */
+  def cleanUpPartitionFoldersRecursively(carbonTable: CarbonTable,
+      partitionSpecList: List[PartitionSpec]): Unit = {
+    if (carbonTable != null && carbonTable.isHivePartitionTable) {
+      val loadMetadataDetails = SegmentStatusManager
+        .readLoadMetadata(carbonTable.getMetadataPath)
+
+      val carbonFile = FileFactory.getCarbonFile(carbonTable.getTablePath)
+
+      // list all files from table path
+      val listOfDefaultPartFilesIterator = carbonFile.listFiles(true)
+      loadMetadataDetails.foreach { metadataDetail =>
+        if 
(metadataDetail.getSegmentStatus.equals(SegmentStatus.MARKED_FOR_DELETE) &&
+          metadataDetail.getSegmentFile == null) {
+          val loadStartTime: Long = metadataDetail.getLoadStartTime
+          // delete all files of @loadStartTime from table path
+          cleanCarbonFilesInFolder(listOfDefaultPartFilesIterator, 
loadStartTime)
+          partitionSpecList.foreach {
+            partitionSpec =>
+              val partitionLocation = partitionSpec.getLocation
+              // For partition folder outside the tablePath
+              if 
(!partitionLocation.toString.startsWith(carbonTable.getTablePath)) {
+                val partitionCarbonFile = FileFactory
+                  .getCarbonFile(partitionLocation.toString)
+                // list all files from partitionLocation
+                val listOfExternalPartFilesIterator = 
partitionCarbonFile.listFiles(true)
+                // delete all files of @loadStartTime from externalPath
+                cleanCarbonFilesInFolder(listOfExternalPartFilesIterator, 
loadStartTime)
+              }
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   *
+   * @param carbonFiles
+   * @param timestamp
+   */
+  private def cleanCarbonFilesInFolder(carbonFiles: java.util.List[CarbonFile],
+      timestamp: Long): Unit = {
+    carbonFiles.asScala.foreach { carbonFile =>
+        val filePath = carbonFile.getPath
+        val fileName = carbonFile.getName
+        if (CarbonTablePath.DataFileUtil.compareCarbonFileTimeStamp(fileName, 
timestamp)) {
+          FileFactory.deleteFile(filePath)
+        }
+    }
+  }
+
+  /**
+   * The in-progress segments which are in stale state will be marked as 
deleted
+   * when driver is initializing.
+   *
+   * @param databaseLocation
+   * @param dbName
+   */
+  def cleanInProgressSegments(databaseLocation: String, dbName: String, 
timeStamp: String): Unit = {
+    val loaderDriver = CarbonProperties.getInstance().
+      getProperty(CarbonCommonConstants.DATA_MANAGEMENT_DRIVER,
+        CarbonCommonConstants.DATA_MANAGEMENT_DRIVER_DEFAULT).toBoolean
+    if (!loaderDriver) {
+      return
+    }
+    try {
+      if (FileFactory.isFileExist(databaseLocation)) {
+        val file = FileFactory.getCarbonFile(databaseLocation)
+        if (file.isDirectory) {
+          val tableFolders = file.listFiles()
+          tableFolders.foreach { tableFolder =>
+            if (tableFolder.isDirectory) {
+              val tablePath = databaseLocation + 
CarbonCommonConstants.FILE_SEPARATOR +
+               tableFolder.getName
+              val tableUniqueName = CarbonTable.buildUniqueName(dbName, 
tableFolder.getName)
+              val tableStatusFile =
+                CarbonTablePath.getTableStatusFilePath(tablePath)
+              if (FileFactory.isFileExist(tableStatusFile)) {
+                try {
+                  val carbonTable = CarbonMetadata.getInstance
+                    .getCarbonTable(tableUniqueName)
+                  
SegmentStatusManager.deleteLoadsAndUpdateMetadata(carbonTable, true, null,
+                    timeStamp)
+                } catch {
+                  case _: Exception =>
+                    LOGGER.warn(s"Error while cleaning table " + 
s"$tableUniqueName")
+                }
+              }
+            }
+          }
+        }
+      }
+    } catch {
+      case s: java.io.FileNotFoundException =>
+        LOGGER.error(s)
+    }
+  }
+
+  /**
+   * The below method deletes all the files and folders in the trash folders 
of all carbon tables
+   * in all databases
+   */
+  def deleteDataFromTrashFolderInAllTables(sparkSession: SparkSession): Unit = 
{
+    try {
+      val databases = sparkSession.sessionState.catalog.listDatabases()
+      databases.foreach(dbName => {
+        val databaseLocation = CarbonEnv.getDatabaseLocation(dbName, 
sparkSession)
+        if (FileFactory.isFileExist(databaseLocation)) {
+          val file = FileFactory.getCarbonFile(databaseLocation)
+          if (file.isDirectory) {
+            val tableFolders = file.listFiles()
+            tableFolders.foreach { tableFolder =>
+              if (tableFolder.isDirectory) {
+                val tablePath = databaseLocation +
+                  CarbonCommonConstants.FILE_SEPARATOR + tableFolder.getName
+                TrashUtil.deleteAllDataFromTrashFolder(tablePath)
+              }
+            }
+          }
+        }
+      })
+    } catch {
+      case e: Throwable =>
+        // catch all exceptions to avoid failure
+        LogServiceFactory.getLogService(this.getClass.getCanonicalName)

Review comment:
       just use LOGGER




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to