[jira] [Updated] (CARBONDATA-4075) Should refactor to use withEvents instead of fireEvent

2020-12-06 Thread David Cai (Jira)


 [ 
https://issues.apache.org/jira/browse/CARBONDATA-4075?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Cai updated CARBONDATA-4075:
--
Summary: Should refactor to use withEvents instead of fireEvent  (was: 
Should refactor carbon to use withEvents instead of fireEvent)

> Should refactor to use withEvents instead of fireEvent
> --
>
> Key: CARBONDATA-4075
> URL: https://issues.apache.org/jira/browse/CARBONDATA-4075
> Project: CarbonData
>  Issue Type: Improvement
>Reporter: David Cai
>Priority: Minor
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537294489



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/events/package.scala
##
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata
+
+package object events {
+  def withEvents(preEvent: Event, postEvent: Event)(func: => Unit): Unit = {

Review comment:
   done
   [Should refactor to use withEvents instead of 
fireEvent](https://issues.apache.org/jira/browse/CARBONDATA-4075)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537294489



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/events/package.scala
##
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata
+
+package object events {
+  def withEvents(preEvent: Event, postEvent: Event)(func: => Unit): Unit = {

Review comment:
   done
   [Should refactor carbon to use withEvents instead of 
fireEvent](https://issues.apache.org/jira/browse/CARBONDATA-4075)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (CARBONDATA-4075) Should refactor carbon to use withEvents instead of fireEvent

2020-12-06 Thread David Cai (Jira)
David Cai created CARBONDATA-4075:
-

 Summary: Should refactor carbon to use withEvents instead of 
fireEvent
 Key: CARBONDATA-4075
 URL: https://issues.apache.org/jira/browse/CARBONDATA-4075
 Project: CarbonData
  Issue Type: Improvement
Reporter: David Cai






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537293803



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, 
LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDeleteclean the MFD/Compacted segments immediately 
and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on 
retention time,
+   * it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+  carbonTable: CarbonTable,
+  isForceDelete: Boolean,
+  cleanStaleInProgress: Boolean,
+  partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+// if isForceDelete = true need to throw exception if 
CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+if (isForceDelete && 
!CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+  LOGGER.error("Clean Files with Force option deletes the physical data 
and it cannot be" +
+" recovered. It is disabled by default, to enable clean files with 
force option," +
+" set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " 
to true")
+  throw new RuntimeException("Clean files with force operation not 
permitted by default")
+}
+var carbonCleanFilesLock: ICarbonLock = null
+try {
+  val errorMsg = "Clean files request is failed for " +
+s"${ carbonTable.getQualifiedName }" +
+". Not able to acquire the clean files lock due to another clean files 
" +
+"operation is running in the background."
+  carbonCleanFilesLock = 
CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+LockUsage.CLEAN_FILES_LOCK, errorMsg)
+  // step 1: check and clean trash folder
+  checkAndCleanTrashFolder(carbonTable, isForceDelete)
+  // step 2: move stale segments which are not exists in metadata into 
.Trash
+  moveStaleSegmentsToTrash(carbonTable)

Review comment:
   I saw that from moveStaleSegmentsToTrash and cleanExpiredSegments, we 
call `SegmentStatusManager.readLoadMetadata` multiple times, is it possible to 
combine two methods so that we read table status once and then if it is stale 
we move to trash, it is expired we delete it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537293803



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, 
LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDeleteclean the MFD/Compacted segments immediately 
and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on 
retention time,
+   * it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+  carbonTable: CarbonTable,
+  isForceDelete: Boolean,
+  cleanStaleInProgress: Boolean,
+  partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+// if isForceDelete = true need to throw exception if 
CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+if (isForceDelete && 
!CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+  LOGGER.error("Clean Files with Force option deletes the physical data 
and it cannot be" +
+" recovered. It is disabled by default, to enable clean files with 
force option," +
+" set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " 
to true")
+  throw new RuntimeException("Clean files with force operation not 
permitted by default")
+}
+var carbonCleanFilesLock: ICarbonLock = null
+try {
+  val errorMsg = "Clean files request is failed for " +
+s"${ carbonTable.getQualifiedName }" +
+". Not able to acquire the clean files lock due to another clean files 
" +
+"operation is running in the background."
+  carbonCleanFilesLock = 
CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+LockUsage.CLEAN_FILES_LOCK, errorMsg)
+  // step 1: check and clean trash folder
+  checkAndCleanTrashFolder(carbonTable, isForceDelete)
+  // step 2: move stale segments which are not exists in metadata into 
.Trash
+  moveStaleSegmentsToTrash(carbonTable)

Review comment:
   I saw that from moveStaleSegmentsToTrash and cleanExpiredSegments, we 
call `SegmentStatusManager.readLoadMetadata`, is it possible to combine two 
methods so that we read table status once and then if it is stale we move to 
trash, it is expired we delete it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537289240



##
File path: 
core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
##
@@ -482,176 +482,6 @@ public boolean accept(CarbonFile file) {
 
   }
 
-  /**
-   * Handling of the clean up of old carbondata files, index files , delete 
delta,
-   * update status files.
-   * @param table clean up will be handled on this table.
-   * @param forceDelete if true then max query execution timeout will not be 
considered.
-   */
-  public static void cleanUpDeltaFiles(CarbonTable table, boolean forceDelete) 
throws IOException {
-
-SegmentStatusManager ssm = new 
SegmentStatusManager(table.getAbsoluteTableIdentifier());
-
-LoadMetadataDetails[] details =
-SegmentStatusManager.readLoadMetadata(table.getMetadataPath());
-
-SegmentUpdateStatusManager updateStatusManager = new 
SegmentUpdateStatusManager(table);
-SegmentUpdateDetails[] segmentUpdateDetails = 
updateStatusManager.getUpdateStatusDetails();
-// hold all the segments updated so that wen can check the delta files in 
them, ne need to
-// check the others.
-Set updatedSegments = new HashSet<>();
-for (SegmentUpdateDetails updateDetails : segmentUpdateDetails) {
-  updatedSegments.add(updateDetails.getSegmentName());
-}
-
-String validUpdateStatusFile = "";
-
-boolean isAbortedFile = true;
-
-boolean isInvalidFile = false;
-
-// take the update status file name from 0th segment.
-validUpdateStatusFile = ssm.getUpdateStatusFileName(details);
-// scan through each segment.
-for (LoadMetadataDetails segment : details) {
-  // if this segment is valid then only we will go for delta file deletion.
-  // if the segment is mark for delete or compacted then any way it will 
get deleted.
-  if (segment.getSegmentStatus() == SegmentStatus.SUCCESS
-  || segment.getSegmentStatus() == 
SegmentStatus.LOAD_PARTIAL_SUCCESS) {
-// when there is no update operations done on table, then no need to 
go ahead. So
-// just check the update delta start timestamp and proceed if not empty
-if (!segment.getUpdateDeltaStartTimestamp().isEmpty()
-|| updatedSegments.contains(segment.getLoadName())) {
-  // take the list of files from this segment.
-  String segmentPath = CarbonTablePath.getSegmentPath(
-  table.getAbsoluteTableIdentifier().getTablePath(), 
segment.getLoadName());
-  CarbonFile segDir =
-  FileFactory.getCarbonFile(segmentPath);
-  CarbonFile[] allSegmentFiles = segDir.listFiles();
-
-  // now handle all the delete delta files which needs to be deleted.
-  // there are 2 cases here .
-  // 1. if the block is marked as compacted then the corresponding 
delta files
-  //can be deleted if query exec timeout is done.
-  // 2. if the block is in success state then also there can be delete
-  //delta compaction happened and old files can be deleted.
-
-  SegmentUpdateDetails[] updateDetails = 
updateStatusManager.readLoadMetadata();
-  for (SegmentUpdateDetails block : updateDetails) {
-CarbonFile[] completeListOfDeleteDeltaFiles;
-CarbonFile[] invalidDeleteDeltaFiles;
-
-if 
(!block.getSegmentName().equalsIgnoreCase(segment.getLoadName())) {
-  continue;
-}
-
-// aborted scenario.
-invalidDeleteDeltaFiles = updateStatusManager
-.getDeleteDeltaInvalidFilesList(block, false,
-allSegmentFiles, isAbortedFile);
-for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
-  boolean doForceDelete = true;
-  compareTimestampsAndDelete(invalidFile, doForceDelete, false);
-}
-
-// case 1
-if (CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
-  completeListOfDeleteDeltaFiles = updateStatusManager
-  .getDeleteDeltaInvalidFilesList(block, true,
-  allSegmentFiles, isInvalidFile);
-  for (CarbonFile invalidFile : completeListOfDeleteDeltaFiles) {
-compareTimestampsAndDelete(invalidFile, forceDelete, false);
-  }
-
-} else {
-  invalidDeleteDeltaFiles = updateStatusManager
-  .getDeleteDeltaInvalidFilesList(block, false,
-  allSegmentFiles, isInvalidFile);
-  for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
-compareTimestampsAndDelete(invalidFile, forceDelete, false);
-  }
-}
-  }
-}
-// handle cleanup of merge index files and data files after small 
files merge happened for
-// SI table
-

[jira] [Created] (CARBONDATA-4074) Should clean stale data in success segments

2020-12-06 Thread David Cai (Jira)
David Cai created CARBONDATA-4074:
-

 Summary: Should clean stale data in success segments
 Key: CARBONDATA-4074
 URL: https://issues.apache.org/jira/browse/CARBONDATA-4074
 Project: CarbonData
  Issue Type: Improvement
Reporter: David Cai


cleaning stale data in success segments include the following parts. 

1.  clean stale delete delta (when force is true)

2. clean stale small  files for index table

3. clean stale data files for loading/compaction



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [carbondata] akashrn5 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


akashrn5 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537288377



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##
@@ -577,38 +556,46 @@ object CarbonDataRDDFactory {
   LOGGER.info("Data load is successful for " +
   s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
 }
-
-// code to handle Pre-Priming cache for loading
-
-if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
-  DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, 
carbonTable, Seq(),
-operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
-}
-try {
-  // compaction handling
-  if (carbonTable.isHivePartitionTable) {
-carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
-  }
-  val compactedSegments = new util.ArrayList[String]()
-  handleSegmentMerging(sqlContext,
-carbonLoadModel
-  .getCopyWithPartition(carbonLoadModel.getCsvHeader, 
carbonLoadModel.getCsvDelimiter),
-carbonTable,
-compactedSegments,
-operationContext)
-  carbonLoadModel.setMergedSegmentIds(compactedSegments)
-  writtenSegment
-} catch {
-  case e: Exception =>
-LOGGER.error(
-  "Auto-Compaction has failed. Ignoring this exception because 
the" +
-  " load is passed.", e)
-writtenSegment
-}
+isLoadingCommitted = true
+writtenSegment
   }
 } finally {
   // Release the segment lock, once table status is finally updated
   segmentLock.unlock()
+  if (isLoadingCommitted) {
+triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, 
operationContext)
+  }
+}
+  }
+
+  private def triggerEventsAfterLoading(
+  sqlContext: SQLContext,
+  carbonLoadModel: CarbonLoadModel,
+  hadoopConf: Configuration,
+  operationContext: OperationContext): Unit = {
+val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+// code to handle Pre-Priming cache for loading
+if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
+  DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, 
carbonTable, Seq(),

Review comment:
   calling two times will increase time, better to have a logic to find out 
whether compacted or not and based on that send the segments to pre-prime only 
once, its better.
   
   Also in `DistributedRDDUtils.scala`, line number 376, new 
`SegmentUpdateStatusManager `is created which is not used, its simply reading 
the table status file and update status, please check if it can be removed. 
Just another input to optimization.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537283710



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, 
LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDeleteclean the MFD/Compacted segments immediately 
and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on 
retention time,
+   * it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+  carbonTable: CarbonTable,
+  isForceDelete: Boolean,
+  cleanStaleInProgress: Boolean,
+  partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+// if isForceDelete = true need to throw exception if 
CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+if (isForceDelete && 
!CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+  LOGGER.error("Clean Files with Force option deletes the physical data 
and it cannot be" +
+" recovered. It is disabled by default, to enable clean files with 
force option," +
+" set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " 
to true")
+  throw new RuntimeException("Clean files with force operation not 
permitted by default")
+}
+var carbonCleanFilesLock: ICarbonLock = null
+try {
+  val errorMsg = "Clean files request is failed for " +
+s"${ carbonTable.getQualifiedName }" +
+". Not able to acquire the clean files lock due to another clean files 
" +
+"operation is running in the background."
+  carbonCleanFilesLock = 
CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+LockUsage.CLEAN_FILES_LOCK, errorMsg)
+  // step 1: check and clean trash folder
+  checkAndCleanTrashFolder(carbonTable, isForceDelete)
+  // step 2: move stale segments which are not exists in metadata into 
.Trash
+  moveStaleSegmentsToTrash(carbonTable)
+  // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In 
Progress)
+  cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, 
partitionSpecs)
+} finally {
+  if (carbonCleanFilesLock != null) {
+CarbonLockUtil.fileUnlock(carbonCleanFilesLock, 
LockUsage.CLEAN_FILES_LOCK)
+  }
+}
+  }
+
+  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, 
isForceDelete: Boolean): Unit = {
+if (isForceDelete) {
+  // empty the trash folder
+  TrashUtil.emptyTrash(carbonTable.getTablePath)
+} else {
+  // clear trash based on timestamp
+  TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+}
+  }
+
+  /**
+   * move stale segment to trash folder, but not include compaction segment

Review comment:
   ```suggestion
  * move stale segment to trash folder, but not include stale compaction 
(x.y) segment
   ```





[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537281516



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##
@@ -577,38 +556,46 @@ object CarbonDataRDDFactory {
   LOGGER.info("Data load is successful for " +
   s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
 }
-
-// code to handle Pre-Priming cache for loading
-
-if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
-  DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, 
carbonTable, Seq(),
-operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
-}
-try {
-  // compaction handling
-  if (carbonTable.isHivePartitionTable) {
-carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
-  }
-  val compactedSegments = new util.ArrayList[String]()
-  handleSegmentMerging(sqlContext,
-carbonLoadModel
-  .getCopyWithPartition(carbonLoadModel.getCsvHeader, 
carbonLoadModel.getCsvDelimiter),
-carbonTable,
-compactedSegments,
-operationContext)
-  carbonLoadModel.setMergedSegmentIds(compactedSegments)
-  writtenSegment
-} catch {
-  case e: Exception =>
-LOGGER.error(
-  "Auto-Compaction has failed. Ignoring this exception because 
the" +
-  " load is passed.", e)
-writtenSegment
-}
+isLoadingCommitted = true
+writtenSegment
   }
 } finally {
   // Release the segment lock, once table status is finally updated
   segmentLock.unlock()
+  if (isLoadingCommitted) {
+triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, 
operationContext)
+  }
+}
+  }
+
+  private def triggerEventsAfterLoading(
+  sqlContext: SQLContext,
+  carbonLoadModel: CarbonLoadModel,
+  hadoopConf: Configuration,
+  operationContext: OperationContext): Unit = {
+val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+// code to handle Pre-Priming cache for loading
+if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
+  DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, 
carbonTable, Seq(),

Review comment:
   ok, we can do it in another pr, compaction maybe not compact this 
segment.
   here need it also.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537282367



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, 
LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDeleteclean the MFD/Compacted segments immediately 
and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on 
retention time,
+   * it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+  carbonTable: CarbonTable,
+  isForceDelete: Boolean,
+  cleanStaleInProgress: Boolean,
+  partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+// if isForceDelete = true need to throw exception if 
CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+if (isForceDelete && 
!CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+  LOGGER.error("Clean Files with Force option deletes the physical data 
and it cannot be" +
+" recovered. It is disabled by default, to enable clean files with 
force option," +
+" set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " 
to true")
+  throw new RuntimeException("Clean files with force operation not 
permitted by default")
+}
+var carbonCleanFilesLock: ICarbonLock = null
+try {
+  val errorMsg = "Clean files request is failed for " +
+s"${ carbonTable.getQualifiedName }" +
+". Not able to acquire the clean files lock due to another clean files 
" +
+"operation is running in the background."
+  carbonCleanFilesLock = 
CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+LockUsage.CLEAN_FILES_LOCK, errorMsg)
+  // step 1: check and clean trash folder
+  checkAndCleanTrashFolder(carbonTable, isForceDelete)
+  // step 2: move stale segments which are not exists in metadata into 
.Trash
+  moveStaleSegmentsToTrash(carbonTable)
+  // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In 
Progress)
+  cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, 
partitionSpecs)

Review comment:
   ```suggestion
 checkAndCleanExpiredSegments(carbonTable, isForceDelete, 
cleanStaleInProgress, partitionSpecs)
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537281516



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##
@@ -577,38 +556,46 @@ object CarbonDataRDDFactory {
   LOGGER.info("Data load is successful for " +
   s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
 }
-
-// code to handle Pre-Priming cache for loading
-
-if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
-  DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, 
carbonTable, Seq(),
-operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
-}
-try {
-  // compaction handling
-  if (carbonTable.isHivePartitionTable) {
-carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
-  }
-  val compactedSegments = new util.ArrayList[String]()
-  handleSegmentMerging(sqlContext,
-carbonLoadModel
-  .getCopyWithPartition(carbonLoadModel.getCsvHeader, 
carbonLoadModel.getCsvDelimiter),
-carbonTable,
-compactedSegments,
-operationContext)
-  carbonLoadModel.setMergedSegmentIds(compactedSegments)
-  writtenSegment
-} catch {
-  case e: Exception =>
-LOGGER.error(
-  "Auto-Compaction has failed. Ignoring this exception because 
the" +
-  " load is passed.", e)
-writtenSegment
-}
+isLoadingCommitted = true
+writtenSegment
   }
 } finally {
   // Release the segment lock, once table status is finally updated
   segmentLock.unlock()
+  if (isLoadingCommitted) {
+triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, 
operationContext)
+  }
+}
+  }
+
+  private def triggerEventsAfterLoading(
+  sqlContext: SQLContext,
+  carbonLoadModel: CarbonLoadModel,
+  hadoopConf: Configuration,
+  operationContext: OperationContext): Unit = {
+val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+// code to handle Pre-Priming cache for loading
+if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
+  DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, 
carbonTable, Seq(),

Review comment:
   ok, we can do it in another pr.
   here need it also.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537281123



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##
@@ -263,15 +249,7 @@ object CarbonDataRDDFactory {
 throw new Exception("Exception in compaction " + 
exception.getMessage)
   }
 } finally {
-  executor.shutdownNow()
-  try {
-compactor.deletePartialLoadsInCompaction()

Review comment:
   already add code to handle compaction exception instead of this function.
   this function will list the whole table to clean partial loads.
   new function only focus on current load.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ydvpankaj99 commented on pull request #4045: ci_test

2020-12-06 Thread GitBox


ydvpankaj99 commented on pull request #4045:
URL: https://github.com/apache/carbondata/pull/4045#issuecomment-739724037


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537278313



##
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##
@@ -192,20 +192,20 @@ public static void emptyTrash(String tablePath) {
   }
 
   /**
-   * This will tell whether the trash retention time has expired or not
-   *
-   * @param fileTimestamp
-   * @return
+   * whether trash data inside of .Trash folder is time out

Review comment:
   done

##
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##
@@ -192,20 +192,20 @@ public static void emptyTrash(String tablePath) {
   }
 
   /**
-   * This will tell whether the trash retention time has expired or not
-   *
-   * @param fileTimestamp
-   * @return
+   * whether trash data inside of .Trash folder is time out
+   */
+  private static boolean isTrashRetentionTimeoutExceeded(long fileTimestamp) {
+int retentionDays = 
CarbonProperties.getInstance().getTrashFolderRetentionTime();
+long retentionMilliSeconds = TimeUnit.DAYS.toMillis(1) * retentionDays;
+return CarbonUpdateUtil.readCurrentTime() - fileTimestamp > 
retentionMilliSeconds;
+  }
+
+  /**
+   * whether trash data outside of .Trash folder is time out
*/
-  public static boolean isTrashRetentionTimeoutExceeded(long fileTimestamp) {
-// record current time.
-long currentTime = CarbonUpdateUtil.readCurrentTime();
-long retentionMilliSeconds = 
(long)Integer.parseInt(CarbonProperties.getInstance()
-.getProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS, 
Integer.toString(
-  CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS_DEFAULT))) * 
TimeUnit.DAYS
-.toMillis(1);
-long difference = currentTime - fileTimestamp;
-return difference > retentionMilliSeconds;
+  public static boolean isTrashDataTimeout(long fileTimestamp) {

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537278197



##
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
##
@@ -48,30 +50,61 @@ class CleanFilesPostEventListener extends 
OperationEventListener with Logging {
 event match {
   case cleanFilesPostEvent: CleanFilesPostEvent =>
 LOGGER.info("Clean files post event listener called")
-val carbonTable = cleanFilesPostEvent.carbonTable
-val indexTables = CarbonIndexUtil
-  .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession)
-val isForceDelete = cleanFilesPostEvent.ifForceDelete
-val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress
-indexTables.foreach { indexTable =>
-  val partitions: Option[Seq[PartitionSpec]] = 
CarbonFilters.getPartitions(
-Seq.empty[Expression],
-cleanFilesPostEvent.sparkSession,
-indexTable)
-  SegmentStatusManager.deleteLoadsAndUpdateMetadata(
-  indexTable, isForceDelete, partitions.map(_.asJava).orNull, 
inProgressSegmentsClean,
-true)
-  CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true)
-  cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
-}
+cleanFilesForIndex(
+  cleanFilesPostEvent.sparkSession,
+  cleanFilesPostEvent.carbonTable,
+  cleanFilesPostEvent.options.getOrElse("force", "false").toBoolean,
+  cleanFilesPostEvent.options.getOrElse("stale_inprogress", 
"false").toBoolean)
+
+cleanFilesForMv(
+  cleanFilesPostEvent.sparkSession,
+  cleanFilesPostEvent.carbonTable,
+  cleanFilesPostEvent.options)
+}
+  }
+
+  private def cleanFilesForIndex(
+  sparkSession: SparkSession,
+  carbonTable: CarbonTable,
+  isForceDelete: Boolean,
+  cleanStaleInProgress: Boolean
+  ): Unit = {
+val indexTables = CarbonIndexUtil
+  .getIndexCarbonTables(carbonTable, sparkSession)
+indexTables.foreach { indexTable =>
+  val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
+Seq.empty[Expression],
+sparkSession,
+indexTable)
+  SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+indexTable, isForceDelete, partitions.map(_.asJava).orNull, 
cleanStaleInProgress,
+true)
+  cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
+}
+  }
+
+  private def cleanFilesForMv(
+  sparkSession: SparkSession,
+  carbonTable: CarbonTable,
+  options: Map[String, String]
+  ): Unit = {
+val viewSchemas = 
MVManagerInSpark.get(sparkSession).getSchemasOnTable(carbonTable)
+if (!viewSchemas.isEmpty) {
+  viewSchemas.asScala.map { schema =>

Review comment:
   fixed, the following command is using the wrong table, changed it to use 
this variable: schema





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537277683



##
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
##
@@ -48,30 +50,61 @@ class CleanFilesPostEventListener extends 
OperationEventListener with Logging {
 event match {
   case cleanFilesPostEvent: CleanFilesPostEvent =>
 LOGGER.info("Clean files post event listener called")
-val carbonTable = cleanFilesPostEvent.carbonTable
-val indexTables = CarbonIndexUtil
-  .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession)
-val isForceDelete = cleanFilesPostEvent.ifForceDelete
-val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress
-indexTables.foreach { indexTable =>
-  val partitions: Option[Seq[PartitionSpec]] = 
CarbonFilters.getPartitions(
-Seq.empty[Expression],
-cleanFilesPostEvent.sparkSession,
-indexTable)
-  SegmentStatusManager.deleteLoadsAndUpdateMetadata(
-  indexTable, isForceDelete, partitions.map(_.asJava).orNull, 
inProgressSegmentsClean,
-true)
-  CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true)
-  cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
-}
+cleanFilesForIndex(
+  cleanFilesPostEvent.sparkSession,
+  cleanFilesPostEvent.carbonTable,
+  cleanFilesPostEvent.options.getOrElse("force", "false").toBoolean,
+  cleanFilesPostEvent.options.getOrElse("stale_inprogress", 
"false").toBoolean)
+
+cleanFilesForMv(
+  cleanFilesPostEvent.sparkSession,
+  cleanFilesPostEvent.carbonTable,
+  cleanFilesPostEvent.options)
+}
+  }
+
+  private def cleanFilesForIndex(
+  sparkSession: SparkSession,
+  carbonTable: CarbonTable,
+  isForceDelete: Boolean,
+  cleanStaleInProgress: Boolean
+  ): Unit = {
+val indexTables = CarbonIndexUtil
+  .getIndexCarbonTables(carbonTable, sparkSession)
+indexTables.foreach { indexTable =>
+  val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
+Seq.empty[Expression],
+sparkSession,
+indexTable)
+  SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+indexTable, isForceDelete, partitions.map(_.asJava).orNull, 
cleanStaleInProgress,
+true)
+  cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
+}
+  }
+
+  private def cleanFilesForMv(
+  sparkSession: SparkSession,
+  carbonTable: CarbonTable,
+  options: Map[String, String]
+  ): Unit = {

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537277613



##
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
##
@@ -48,30 +50,61 @@ class CleanFilesPostEventListener extends 
OperationEventListener with Logging {
 event match {
   case cleanFilesPostEvent: CleanFilesPostEvent =>
 LOGGER.info("Clean files post event listener called")
-val carbonTable = cleanFilesPostEvent.carbonTable
-val indexTables = CarbonIndexUtil
-  .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession)
-val isForceDelete = cleanFilesPostEvent.ifForceDelete
-val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress
-indexTables.foreach { indexTable =>
-  val partitions: Option[Seq[PartitionSpec]] = 
CarbonFilters.getPartitions(
-Seq.empty[Expression],
-cleanFilesPostEvent.sparkSession,
-indexTable)
-  SegmentStatusManager.deleteLoadsAndUpdateMetadata(
-  indexTable, isForceDelete, partitions.map(_.asJava).orNull, 
inProgressSegmentsClean,
-true)
-  CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true)
-  cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
-}
+cleanFilesForIndex(
+  cleanFilesPostEvent.sparkSession,
+  cleanFilesPostEvent.carbonTable,
+  cleanFilesPostEvent.options.getOrElse("force", "false").toBoolean,
+  cleanFilesPostEvent.options.getOrElse("stale_inprogress", 
"false").toBoolean)
+
+cleanFilesForMv(
+  cleanFilesPostEvent.sparkSession,
+  cleanFilesPostEvent.carbonTable,
+  cleanFilesPostEvent.options)
+}
+  }
+
+  private def cleanFilesForIndex(
+  sparkSession: SparkSession,
+  carbonTable: CarbonTable,
+  isForceDelete: Boolean,
+  cleanStaleInProgress: Boolean
+  ): Unit = {

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537277508



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, 
LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDeleteclean the MFD/Compacted segments immediately 
and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on 
retention time,
+   * it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+  carbonTable: CarbonTable,
+  isForceDelete: Boolean,
+  cleanStaleInProgress: Boolean,
+  partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+// if isForceDelete = true need to throw exception if 
CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+if (isForceDelete && 
!CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+  LOGGER.error("Clean Files with Force option deletes the physical data 
and it cannot be" +
+" recovered. It is disabled by default, to enable clean files with 
force option," +
+" set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " 
to true")
+  throw new RuntimeException("Clean files with force operation not 
permitted by default")
+}
+var carbonCleanFilesLock: ICarbonLock = null
+try {
+  val errorMsg = "Clean files request is failed for " +
+s"${ carbonTable.getQualifiedName }" +
+". Not able to acquire the clean files lock due to another clean files 
" +
+"operation is running in the background."
+  carbonCleanFilesLock = 
CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+LockUsage.CLEAN_FILES_LOCK, errorMsg)
+  // step 1: check and clean trash folder
+  checkAndCleanTrashFolder(carbonTable, isForceDelete)
+  // step 2: move stale segments which are not exists in metadata into 
.Trash
+  moveStaleSegmentsToTrash(carbonTable)
+  // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In 
Progress)
+  cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, 
partitionSpecs)
+} finally {
+  if (carbonCleanFilesLock != null) {
+CarbonLockUtil.fileUnlock(carbonCleanFilesLock, 
LockUsage.CLEAN_FILES_LOCK)
+  }
+}
+  }
+
+  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, 
isForceDelete: Boolean): Unit = {
+if (isForceDelete) {
+  // empty the trash folder
+  TrashUtil.emptyTrash(carbonTable.getTablePath)
+} else {
+  // clear trash based on timestamp
+  TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+}
+  }
+
+  /**
+   * move stale segment to trash folder, but not include compaction segment
+   */
+  private def moveStaleSegmentsToTrash(carbonTable: CarbonTable): Unit = {
+if (carbonTable.isHivePartitionTable) {
+  

[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537277390



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, 
LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDeleteclean the MFD/Compacted segments immediately 
and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on 
retention time,
+   * it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+  carbonTable: CarbonTable,
+  isForceDelete: Boolean,
+  cleanStaleInProgress: Boolean,
+  partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+// if isForceDelete = true need to throw exception if 
CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+if (isForceDelete && 
!CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+  LOGGER.error("Clean Files with Force option deletes the physical data 
and it cannot be" +
+" recovered. It is disabled by default, to enable clean files with 
force option," +
+" set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " 
to true")
+  throw new RuntimeException("Clean files with force operation not 
permitted by default")
+}
+var carbonCleanFilesLock: ICarbonLock = null
+try {
+  val errorMsg = "Clean files request is failed for " +
+s"${ carbonTable.getQualifiedName }" +
+". Not able to acquire the clean files lock due to another clean files 
" +
+"operation is running in the background."
+  carbonCleanFilesLock = 
CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+LockUsage.CLEAN_FILES_LOCK, errorMsg)
+  // step 1: check and clean trash folder
+  checkAndCleanTrashFolder(carbonTable, isForceDelete)
+  // step 2: move stale segments which are not exists in metadata into 
.Trash
+  moveStaleSegmentsToTrash(carbonTable)
+  // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In 
Progress)
+  cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, 
partitionSpecs)
+} finally {
+  if (carbonCleanFilesLock != null) {
+CarbonLockUtil.fileUnlock(carbonCleanFilesLock, 
LockUsage.CLEAN_FILES_LOCK)
+  }
+}
+  }
+
+  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, 
isForceDelete: Boolean): Unit = {
+if (isForceDelete) {
+  // empty the trash folder
+  TrashUtil.emptyTrash(carbonTable.getTablePath)
+} else {
+  // clear trash based on timestamp
+  TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+}
+  }
+
+  /**
+   * move stale segment to trash folder, but not include compaction segment
+   */
+  private def moveStaleSegmentsToTrash(carbonTable: CarbonTable): Unit = {
+if (carbonTable.isHivePartitionTable) {
+  

[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537277121



##
File path: 
core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
##
@@ -2123,29 +2123,35 @@ public int getMaxSIRepairLimit(String dbName, String 
tableName) {
* folder will take place
*/
   private void validateTrashFolderRetentionTime() {
-String propertyValue = carbonProperties.getProperty(CarbonCommonConstants
-.CARBON_TRASH_RETENTION_DAYS, Integer.toString(CarbonCommonConstants
-.CARBON_TRASH_RETENTION_DAYS_DEFAULT));
+String propertyValue = carbonProperties.getProperty(

Review comment:
   done

##
File path: 
core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##
@@ -163,8 +164,13 @@ private static void getStaleSegmentFiles(CarbonTable 
carbonTable, List s
 }
 Set loadNameSet = 
Arrays.stream(details).map(LoadMetadataDetails::getLoadName)
 .collect(Collectors.toSet());
-List staleSegments = segmentFiles.stream().filter(segmentFile -> 
!loadNameSet.contains(
-
DataFileUtil.getSegmentNoFromSegmentFile(segmentFile))).collect(Collectors.toList());
+// get all stale segment files, not include compaction segments

Review comment:
   done

##
File path: docs/clean-files.md
##
@@ -38,6 +38,9 @@ The above clean files command will clean Marked For Delete 
and Compacted segment
``` 
   Once the timestamp subdirectory is expired as per the configured expiration 
day value, that subdirectory is deleted from the trash folder in the subsequent 
clean files command.
 
+**NOTE**:
+  * In trash folder, the retention time is "carbon.trash.retention.days"
+  * Outside trash folder, the retention time is max value of two 
properties("carbon.trash.retention.days", "max.query.execution.time")

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537276344



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##
@@ -263,15 +249,7 @@ object CarbonDataRDDFactory {
 throw new Exception("Exception in compaction " + 
exception.getMessage)
   }
 } finally {
-  executor.shutdownNow()
-  try {
-compactor.deletePartialLoadsInCompaction()

Review comment:
   ok, I see that many changes are done because to support this 
`DataTrashManager.cleanStaleCompactionSegment`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537276094



##
File path: 
core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##
@@ -163,8 +164,13 @@ private static void getStaleSegmentFiles(CarbonTable 
carbonTable, List s
 }
 Set loadNameSet = 
Arrays.stream(details).map(LoadMetadataDetails::getLoadName)
 .collect(Collectors.toSet());
-List staleSegments = segmentFiles.stream().filter(segmentFile -> 
!loadNameSet.contains(
-
DataFileUtil.getSegmentNoFromSegmentFile(segmentFile))).collect(Collectors.toList());
+// get all stale segment files, not include compaction segments
+List staleSegments = segmentFiles.stream()
+.filter(segmentFile -> 
!DataFileUtil.getSegmentNoFromSegmentFile(segmentFile).contains(
+CarbonCommonConstants.POINT))
+.filter(segmentFile -> !loadNameSet.contains(

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537275990



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, 
LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDeleteclean the MFD/Compacted segments immediately 
and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on 
retention time,
+   * it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+  carbonTable: CarbonTable,
+  isForceDelete: Boolean,
+  cleanStaleInProgress: Boolean,
+  partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+// if isForceDelete = true need to throw exception if 
CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+if (isForceDelete && 
!CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+  LOGGER.error("Clean Files with Force option deletes the physical data 
and it cannot be" +
+" recovered. It is disabled by default, to enable clean files with 
force option," +
+" set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " 
to true")
+  throw new RuntimeException("Clean files with force operation not 
permitted by default")
+}
+var carbonCleanFilesLock: ICarbonLock = null
+try {
+  val errorMsg = "Clean files request is failed for " +
+s"${ carbonTable.getQualifiedName }" +
+". Not able to acquire the clean files lock due to another clean files 
" +
+"operation is running in the background."
+  carbonCleanFilesLock = 
CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+LockUsage.CLEAN_FILES_LOCK, errorMsg)
+  // step 1: check and clean trash folder
+  checkAndCleanTrashFolder(carbonTable, isForceDelete)
+  // step 2: move stale segments which are not exists in metadata into 
.Trash
+  moveStaleSegmentsToTrash(carbonTable)
+  // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In 
Progress)
+  cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, 
partitionSpecs)
+} finally {
+  if (carbonCleanFilesLock != null) {
+CarbonLockUtil.fileUnlock(carbonCleanFilesLock, 
LockUsage.CLEAN_FILES_LOCK)
+  }
+}
+  }
+
+  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, 
isForceDelete: Boolean): Unit = {
+if (isForceDelete) {
+  // empty the trash folder
+  TrashUtil.emptyTrash(carbonTable.getTablePath)
+} else {
+  // clear trash based on timestamp
+  TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+}
+  }
+
+  /**
+   * move stale segment to trash folder, but not include compaction segment
+   */
+  private def moveStaleSegmentsToTrash(carbonTable: CarbonTable): Unit = {
+if (carbonTable.isHivePartitionTable) {
+  

[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537275885



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##
@@ -577,38 +556,46 @@ object CarbonDataRDDFactory {
   LOGGER.info("Data load is successful for " +
   s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
 }
-
-// code to handle Pre-Priming cache for loading
-
-if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
-  DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, 
carbonTable, Seq(),
-operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
-}
-try {
-  // compaction handling
-  if (carbonTable.isHivePartitionTable) {
-carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
-  }
-  val compactedSegments = new util.ArrayList[String]()
-  handleSegmentMerging(sqlContext,
-carbonLoadModel
-  .getCopyWithPartition(carbonLoadModel.getCsvHeader, 
carbonLoadModel.getCsvDelimiter),
-carbonTable,
-compactedSegments,
-operationContext)
-  carbonLoadModel.setMergedSegmentIds(compactedSegments)
-  writtenSegment
-} catch {
-  case e: Exception =>
-LOGGER.error(
-  "Auto-Compaction has failed. Ignoring this exception because 
the" +
-  " load is passed.", e)
-writtenSegment
-}
+isLoadingCommitted = true
+writtenSegment
   }
 } finally {
   // Release the segment lock, once table status is finally updated
   segmentLock.unlock()
+  if (isLoadingCommitted) {
+triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, 
operationContext)
+  }
+}
+  }
+
+  private def triggerEventsAfterLoading(
+  sqlContext: SQLContext,
+  carbonLoadModel: CarbonLoadModel,
+  hadoopConf: Configuration,
+  operationContext: OperationContext): Unit = {
+val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+// code to handle Pre-Priming cache for loading
+if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
+  DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, 
carbonTable, Seq(),

Review comment:
   If auto compaction fails, the current load will be passed. we can 
trigger pre-priming. 
   
   Why I suggested after auto comapction is for success case, that time no need 
to pre-prime current segment as it will become MFD if it goes under auto 
compaction. so, we can save pre-priming one segment may be. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739717520


   Build Failed  with Spark 2.3.4, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5097/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739717393


   Build Failed  with Spark 2.4.5, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3316/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4045: ci_test

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4045:
URL: https://github.com/apache/carbondata/pull/4045#issuecomment-739717207


   Build Failed  with Spark 2.3.4, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5096/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537273944



##
File path: 
core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##
@@ -163,8 +164,13 @@ private static void getStaleSegmentFiles(CarbonTable 
carbonTable, List s
 }
 Set loadNameSet = 
Arrays.stream(details).map(LoadMetadataDetails::getLoadName)
 .collect(Collectors.toSet());
-List staleSegments = segmentFiles.stream().filter(segmentFile -> 
!loadNameSet.contains(
-
DataFileUtil.getSegmentNoFromSegmentFile(segmentFile))).collect(Collectors.toList());
+// get all stale segment files, not include compaction segments

Review comment:
   done





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] QiangCai commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


QiangCai commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537273799



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##
@@ -19,13 +19,13 @@ package org.apache.carbondata.api
 
 import java.io.{DataInputStream, FileNotFoundException, InputStreamReader}
 import java.time.{Duration, Instant}
-import java.util
 import java.util.{Collections, Comparator}
 
 import scala.collection.JavaConverters._
 import scala.util.control.Breaks.{break, breakable}
 
 import com.google.gson.Gson
+import java.util

Review comment:
   reverted





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4045: ci_test

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4045:
URL: https://github.com/apache/carbondata/pull/4045#issuecomment-739716584


   Build Failed  with Spark 2.4.5, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3315/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739715250


   Build Failed  with Spark 2.3.4, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5074/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4039: [WIP] Refactor and Fix Insert into partition issue with FileMergeSortComparator

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4039:
URL: https://github.com/apache/carbondata/pull/4039#issuecomment-739714799


   Build Success with Spark 2.3.4, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5094/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 opened a new pull request #4045: ci_test

2020-12-06 Thread GitBox


CarbonDataQA2 opened a new pull request #4045:
URL: https://github.com/apache/carbondata/pull/4045


### Why is this PR needed?
CI test

### What changes were proposed in this PR?
   Nothing
   
### Does this PR introduce any user interface change?
- No

   
### Is any new testcase added?
- No

   
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] akashrn5 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


akashrn5 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537271584



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##
@@ -263,15 +249,7 @@ object CarbonDataRDDFactory {
 throw new Exception("Exception in compaction " + 
exception.getMessage)
   }
 } finally {
-  executor.shutdownNow()
-  try {
-compactor.deletePartialLoadsInCompaction()

Review comment:
   better to add proper description in the PR and handle here only, instead 
of handling again in other PR, as review will be easy and to avoid duplicate 
working, should be fine @ajantha-bhat ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] akashrn5 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


akashrn5 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537271258



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##
@@ -577,38 +556,46 @@ object CarbonDataRDDFactory {
   LOGGER.info("Data load is successful for " +
   s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
 }
-
-// code to handle Pre-Priming cache for loading
-
-if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
-  DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, 
carbonTable, Seq(),
-operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
-}
-try {
-  // compaction handling
-  if (carbonTable.isHivePartitionTable) {
-carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
-  }
-  val compactedSegments = new util.ArrayList[String]()
-  handleSegmentMerging(sqlContext,
-carbonLoadModel
-  .getCopyWithPartition(carbonLoadModel.getCsvHeader, 
carbonLoadModel.getCsvDelimiter),
-carbonTable,
-compactedSegments,
-operationContext)
-  carbonLoadModel.setMergedSegmentIds(compactedSegments)
-  writtenSegment
-} catch {
-  case e: Exception =>
-LOGGER.error(
-  "Auto-Compaction has failed. Ignoring this exception because 
the" +
-  " load is passed.", e)
-writtenSegment
-}
+isLoadingCommitted = true
+writtenSegment
   }
 } finally {
   // Release the segment lock, once table status is finally updated
   segmentLock.unlock()
+  if (isLoadingCommitted) {
+triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, 
operationContext)
+  }
+}
+  }
+
+  private def triggerEventsAfterLoading(
+  sqlContext: SQLContext,
+  carbonLoadModel: CarbonLoadModel,
+  hadoopConf: Configuration,
+  operationContext: OperationContext): Unit = {
+val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+// code to handle Pre-Priming cache for loading
+if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
+  DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, 
carbonTable, Seq(),

Review comment:
   yes, agree with @ajantha-bhat , if auto compaction success pre-prime 
that segment else just load segment





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] akashrn5 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


akashrn5 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537270696



##
File path: 
core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
##
@@ -2123,29 +2123,35 @@ public int getMaxSIRepairLimit(String dbName, String 
tableName) {
* folder will take place
*/
   private void validateTrashFolderRetentionTime() {
-String propertyValue = carbonProperties.getProperty(CarbonCommonConstants
-.CARBON_TRASH_RETENTION_DAYS, Integer.toString(CarbonCommonConstants
-.CARBON_TRASH_RETENTION_DAYS_DEFAULT));
+String propertyValue = carbonProperties.getProperty(

Review comment:
   @ajantha-bhat the `getTrashFolderRetentionTime ` method implementation 
is just IntegerparseInt(Carbonproperties.getproperty) so, here also its the 
same thing right. What exactly you mean its already stored and not validation?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537266114



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##
@@ -577,38 +556,46 @@ object CarbonDataRDDFactory {
   LOGGER.info("Data load is successful for " +
   s"${ carbonLoadModel.getDatabaseName }.${ 
carbonLoadModel.getTableName }")
 }
-
-// code to handle Pre-Priming cache for loading
-
-if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
-  DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, 
carbonTable, Seq(),
-operationContext, hadoopConf, List(carbonLoadModel.getSegmentId))
-}
-try {
-  // compaction handling
-  if (carbonTable.isHivePartitionTable) {
-carbonLoadModel.setFactTimeStamp(System.currentTimeMillis())
-  }
-  val compactedSegments = new util.ArrayList[String]()
-  handleSegmentMerging(sqlContext,
-carbonLoadModel
-  .getCopyWithPartition(carbonLoadModel.getCsvHeader, 
carbonLoadModel.getCsvDelimiter),
-carbonTable,
-compactedSegments,
-operationContext)
-  carbonLoadModel.setMergedSegmentIds(compactedSegments)
-  writtenSegment
-} catch {
-  case e: Exception =>
-LOGGER.error(
-  "Auto-Compaction has failed. Ignoring this exception because 
the" +
-  " load is passed.", e)
-writtenSegment
-}
+isLoadingCommitted = true
+writtenSegment
   }
 } finally {
   // Release the segment lock, once table status is finally updated
   segmentLock.unlock()
+  if (isLoadingCommitted) {
+triggerEventsAfterLoading(sqlContext, carbonLoadModel, hadoopConf, 
operationContext)
+  }
+}
+  }
+
+  private def triggerEventsAfterLoading(
+  sqlContext: SQLContext,
+  carbonLoadModel: CarbonLoadModel,
+  hadoopConf: Configuration,
+  operationContext: OperationContext): Unit = {
+val carbonTable = carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable
+// code to handle Pre-Priming cache for loading
+if (!StringUtils.isEmpty(carbonLoadModel.getSegmentId)) {
+  DistributedRDDUtils.triggerPrepriming(sqlContext.sparkSession, 
carbonTable, Seq(),

Review comment:
   is it better to trigger pre-priming after auto-compaction code ?  
@akashrn5 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537264650



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##
@@ -263,15 +249,7 @@ object CarbonDataRDDFactory {
 throw new Exception("Exception in compaction " + 
exception.getMessage)
   }
 } finally {
-  executor.shutdownNow()
-  try {
-compactor.deletePartialLoadsInCompaction()

Review comment:
   In this file, other than removing the `deletePartialLoadsInCompaction`, 
rest of the changes are nothing to do with clean files PR. can you raise a 
separate PR for this? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537264650



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/spark/rdd/CarbonDataRDDFactory.scala
##
@@ -263,15 +249,7 @@ object CarbonDataRDDFactory {
 throw new Exception("Exception in compaction " + 
exception.getMessage)
   }
 } finally {
-  executor.shutdownNow()
-  try {
-compactor.deletePartialLoadsInCompaction()

Review comment:
   other than removing the `deletePartialLoadsInCompaction`, rest of the 
changes are nothing to do with clean files PR. can you raise a separate PR for 
this? 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537260678



##
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##
@@ -192,20 +192,20 @@ public static void emptyTrash(String tablePath) {
   }
 
   /**
-   * This will tell whether the trash retention time has expired or not
-   *
-   * @param fileTimestamp
-   * @return
+   * whether trash data inside of .Trash folder is time out
+   */
+  private static boolean isTrashRetentionTimeoutExceeded(long fileTimestamp) {
+int retentionDays = 
CarbonProperties.getInstance().getTrashFolderRetentionTime();
+long retentionMilliSeconds = TimeUnit.DAYS.toMillis(1) * retentionDays;
+return CarbonUpdateUtil.readCurrentTime() - fileTimestamp > 
retentionMilliSeconds;
+  }
+
+  /**
+   * whether trash data outside of .Trash folder is time out
*/
-  public static boolean isTrashRetentionTimeoutExceeded(long fileTimestamp) {
-// record current time.
-long currentTime = CarbonUpdateUtil.readCurrentTime();
-long retentionMilliSeconds = 
(long)Integer.parseInt(CarbonProperties.getInstance()
-.getProperty(CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS, 
Integer.toString(
-  CarbonCommonConstants.CARBON_TRASH_RETENTION_DAYS_DEFAULT))) * 
TimeUnit.DAYS
-.toMillis(1);
-long difference = currentTime - fileTimestamp;
-return difference > retentionMilliSeconds;
+  public static boolean isTrashDataTimeout(long fileTimestamp) {

Review comment:
   ```suggestion
 public static boolean isDataOutsideTrashIsExpired(long fileTimestamp) {
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537259682



##
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##
@@ -192,20 +192,20 @@ public static void emptyTrash(String tablePath) {
   }
 
   /**
-   * This will tell whether the trash retention time has expired or not
-   *
-   * @param fileTimestamp
-   * @return
+   * whether trash data inside of .Trash folder is time out

Review comment:
   ```suggestion
  * check If the fileTimestamp is expired based on 
`carbon.trash.retention.days`
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537259682



##
File path: core/src/main/java/org/apache/carbondata/core/util/TrashUtil.java
##
@@ -192,20 +192,20 @@ public static void emptyTrash(String tablePath) {
   }
 
   /**
-   * This will tell whether the trash retention time has expired or not
-   *
-   * @param fileTimestamp
-   * @return
+   * whether trash data inside of .Trash folder is time out

Review comment:
   ```suggestion
  * check If the  fileTimestamp is expired based on 
`carbon.trash.retention.days`
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537258139



##
File path: 
core/src/main/java/org/apache/carbondata/core/util/CarbonProperties.java
##
@@ -2123,29 +2123,35 @@ public int getMaxSIRepairLimit(String dbName, String 
tableName) {
* folder will take place
*/
   private void validateTrashFolderRetentionTime() {
-String propertyValue = carbonProperties.getProperty(CarbonCommonConstants
-.CARBON_TRASH_RETENTION_DAYS, Integer.toString(CarbonCommonConstants
-.CARBON_TRASH_RETENTION_DAYS_DEFAULT));
+String propertyValue = carbonProperties.getProperty(

Review comment:
   @akashrn5 : getTrashFolderRetentionTime is used to access the already 
validated and stored value. It won't check the parsing errors as it is already 
validated. so current changes looks ok for me for this first time validation 
for add property 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 closed pull request #4014: qa test

2020-12-06 Thread GitBox


CarbonDataQA2 closed pull request #4014:
URL: https://github.com/apache/carbondata/pull/4014


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 closed pull request #4016: Qa test2

2020-12-06 Thread GitBox


CarbonDataQA2 closed pull request #4016:
URL: https://github.com/apache/carbondata/pull/4016


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] akashrn5 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


akashrn5 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537252858



##
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
##
@@ -48,30 +50,61 @@ class CleanFilesPostEventListener extends 
OperationEventListener with Logging {
 event match {
   case cleanFilesPostEvent: CleanFilesPostEvent =>
 LOGGER.info("Clean files post event listener called")
-val carbonTable = cleanFilesPostEvent.carbonTable
-val indexTables = CarbonIndexUtil
-  .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession)
-val isForceDelete = cleanFilesPostEvent.ifForceDelete
-val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress
-indexTables.foreach { indexTable =>
-  val partitions: Option[Seq[PartitionSpec]] = 
CarbonFilters.getPartitions(
-Seq.empty[Expression],
-cleanFilesPostEvent.sparkSession,
-indexTable)
-  SegmentStatusManager.deleteLoadsAndUpdateMetadata(
-  indexTable, isForceDelete, partitions.map(_.asJava).orNull, 
inProgressSegmentsClean,
-true)
-  CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true)
-  cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
-}
+cleanFilesForIndex(
+  cleanFilesPostEvent.sparkSession,
+  cleanFilesPostEvent.carbonTable,
+  cleanFilesPostEvent.options.getOrElse("force", "false").toBoolean,
+  cleanFilesPostEvent.options.getOrElse("stale_inprogress", 
"false").toBoolean)
+
+cleanFilesForMv(
+  cleanFilesPostEvent.sparkSession,
+  cleanFilesPostEvent.carbonTable,
+  cleanFilesPostEvent.options)
+}
+  }
+
+  private def cleanFilesForIndex(
+  sparkSession: SparkSession,
+  carbonTable: CarbonTable,
+  isForceDelete: Boolean,
+  cleanStaleInProgress: Boolean
+  ): Unit = {
+val indexTables = CarbonIndexUtil
+  .getIndexCarbonTables(carbonTable, sparkSession)
+indexTables.foreach { indexTable =>
+  val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
+Seq.empty[Expression],
+sparkSession,
+indexTable)
+  SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+indexTable, isForceDelete, partitions.map(_.asJava).orNull, 
cleanStaleInProgress,
+true)
+  cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
+}
+  }
+
+  private def cleanFilesForMv(
+  sparkSession: SparkSession,
+  carbonTable: CarbonTable,
+  options: Map[String, String]
+  ): Unit = {
+val viewSchemas = 
MVManagerInSpark.get(sparkSession).getSchemasOnTable(carbonTable)
+if (!viewSchemas.isEmpty) {
+  viewSchemas.asScala.map { schema =>

Review comment:
   replace  `schema ` with a placeholder as its not used





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739689562


   Build Failed  with Spark 2.4.5, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3314/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] akashrn5 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


akashrn5 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537252530



##
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
##
@@ -48,30 +50,61 @@ class CleanFilesPostEventListener extends 
OperationEventListener with Logging {
 event match {
   case cleanFilesPostEvent: CleanFilesPostEvent =>
 LOGGER.info("Clean files post event listener called")
-val carbonTable = cleanFilesPostEvent.carbonTable
-val indexTables = CarbonIndexUtil
-  .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession)
-val isForceDelete = cleanFilesPostEvent.ifForceDelete
-val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress
-indexTables.foreach { indexTable =>
-  val partitions: Option[Seq[PartitionSpec]] = 
CarbonFilters.getPartitions(
-Seq.empty[Expression],
-cleanFilesPostEvent.sparkSession,
-indexTable)
-  SegmentStatusManager.deleteLoadsAndUpdateMetadata(
-  indexTable, isForceDelete, partitions.map(_.asJava).orNull, 
inProgressSegmentsClean,
-true)
-  CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true)
-  cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
-}
+cleanFilesForIndex(
+  cleanFilesPostEvent.sparkSession,
+  cleanFilesPostEvent.carbonTable,
+  cleanFilesPostEvent.options.getOrElse("force", "false").toBoolean,
+  cleanFilesPostEvent.options.getOrElse("stale_inprogress", 
"false").toBoolean)
+
+cleanFilesForMv(
+  cleanFilesPostEvent.sparkSession,
+  cleanFilesPostEvent.carbonTable,
+  cleanFilesPostEvent.options)
+}
+  }
+
+  private def cleanFilesForIndex(
+  sparkSession: SparkSession,
+  carbonTable: CarbonTable,
+  isForceDelete: Boolean,
+  cleanStaleInProgress: Boolean
+  ): Unit = {
+val indexTables = CarbonIndexUtil
+  .getIndexCarbonTables(carbonTable, sparkSession)
+indexTables.foreach { indexTable =>
+  val partitions: Option[Seq[PartitionSpec]] = CarbonFilters.getPartitions(
+Seq.empty[Expression],
+sparkSession,
+indexTable)
+  SegmentStatusManager.deleteLoadsAndUpdateMetadata(
+indexTable, isForceDelete, partitions.map(_.asJava).orNull, 
cleanStaleInProgress,
+true)
+  cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
+}
+  }
+
+  private def cleanFilesForMv(
+  sparkSession: SparkSession,
+  carbonTable: CarbonTable,
+  options: Map[String, String]
+  ): Unit = {

Review comment:
   same as above





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] akashrn5 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


akashrn5 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537252406



##
File path: 
integration/spark/src/main/scala/org/apache/spark/sql/secondaryindex/events/CleanFilesPostEventListener.scala
##
@@ -48,30 +50,61 @@ class CleanFilesPostEventListener extends 
OperationEventListener with Logging {
 event match {
   case cleanFilesPostEvent: CleanFilesPostEvent =>
 LOGGER.info("Clean files post event listener called")
-val carbonTable = cleanFilesPostEvent.carbonTable
-val indexTables = CarbonIndexUtil
-  .getIndexCarbonTables(carbonTable, cleanFilesPostEvent.sparkSession)
-val isForceDelete = cleanFilesPostEvent.ifForceDelete
-val inProgressSegmentsClean = cleanFilesPostEvent.cleanStaleInProgress
-indexTables.foreach { indexTable =>
-  val partitions: Option[Seq[PartitionSpec]] = 
CarbonFilters.getPartitions(
-Seq.empty[Expression],
-cleanFilesPostEvent.sparkSession,
-indexTable)
-  SegmentStatusManager.deleteLoadsAndUpdateMetadata(
-  indexTable, isForceDelete, partitions.map(_.asJava).orNull, 
inProgressSegmentsClean,
-true)
-  CarbonUpdateUtil.cleanUpDeltaFiles(indexTable, true)
-  cleanUpUnwantedSegmentsOfSIAndUpdateMetadata(indexTable, carbonTable)
-}
+cleanFilesForIndex(
+  cleanFilesPostEvent.sparkSession,
+  cleanFilesPostEvent.carbonTable,
+  cleanFilesPostEvent.options.getOrElse("force", "false").toBoolean,
+  cleanFilesPostEvent.options.getOrElse("stale_inprogress", 
"false").toBoolean)
+
+cleanFilesForMv(
+  cleanFilesPostEvent.sparkSession,
+  cleanFilesPostEvent.carbonTable,
+  cleanFilesPostEvent.options)
+}
+  }
+
+  private def cleanFilesForIndex(
+  sparkSession: SparkSession,
+  carbonTable: CarbonTable,
+  isForceDelete: Boolean,
+  cleanStaleInProgress: Boolean
+  ): Unit = {

Review comment:
   move this line above





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739688814


   Build Failed  with Spark 2.3.4, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5095/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4039: [WIP] Refactor and Fix Insert into partition issue with FileMergeSortComparator

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4039:
URL: https://github.com/apache/carbondata/pull/4039#issuecomment-739688329


   Build Failed  with Spark 2.4.5, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3338/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4039: [WIP] Refactor and Fix Insert into partition issue with FileMergeSortComparator

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4039:
URL: https://github.com/apache/carbondata/pull/4039#issuecomment-739688118


   Build Failed  with Spark 2.3.4, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5073/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ydvpankaj99 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


ydvpankaj99 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739687768


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4032: [WIP][CARBONDATA-4065] Support MERGE INTO SQL Command

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4032:
URL: https://github.com/apache/carbondata/pull/4032#issuecomment-739687365


   Build Success with Spark 2.3.4, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5072/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4032: [WIP][CARBONDATA-4065] Support MERGE INTO SQL Command

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4032:
URL: https://github.com/apache/carbondata/pull/4032#issuecomment-739687262







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739687261


   Build Failed  with Spark 2.4.5, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3336/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537248838



##
File path: 
core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##
@@ -163,8 +164,13 @@ private static void getStaleSegmentFiles(CarbonTable 
carbonTable, List s
 }
 Set loadNameSet = 
Arrays.stream(details).map(LoadMetadataDetails::getLoadName)
 .collect(Collectors.toSet());
-List staleSegments = segmentFiles.stream().filter(segmentFile -> 
!loadNameSet.contains(
-
DataFileUtil.getSegmentNoFromSegmentFile(segmentFile))).collect(Collectors.toList());
+// get all stale segment files, not include compaction segments

Review comment:
   please add a detail comments about why we need to ignore stale compacted 
(x.y) segment. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] ajantha-bhat commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


ajantha-bhat commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537248198



##
File path: 
core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##
@@ -163,8 +164,13 @@ private static void getStaleSegmentFiles(CarbonTable 
carbonTable, List s
 }
 Set loadNameSet = 
Arrays.stream(details).map(LoadMetadataDetails::getLoadName)
 .collect(Collectors.toSet());
-List staleSegments = segmentFiles.stream().filter(segmentFile -> 
!loadNameSet.contains(
-
DataFileUtil.getSegmentNoFromSegmentFile(segmentFile))).collect(Collectors.toList());
+// get all stale segment files, not include compaction segments
+List staleSegments = segmentFiles.stream()
+.filter(segmentFile -> 
!DataFileUtil.getSegmentNoFromSegmentFile(segmentFile).contains(
+CarbonCommonConstants.POINT))
+.filter(segmentFile -> !loadNameSet.contains(

Review comment:
   agree with @kunal642 and also call 
`DataFileUtil.getSegmentNoFromSegmentFile` only once per segment





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


vikramahuja1001 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537245787



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, 
LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDeleteclean the MFD/Compacted segments immediately 
and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on 
retention time,
+   * it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+  carbonTable: CarbonTable,
+  isForceDelete: Boolean,
+  cleanStaleInProgress: Boolean,
+  partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+// if isForceDelete = true need to throw exception if 
CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+if (isForceDelete && 
!CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+  LOGGER.error("Clean Files with Force option deletes the physical data 
and it cannot be" +
+" recovered. It is disabled by default, to enable clean files with 
force option," +
+" set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " 
to true")
+  throw new RuntimeException("Clean files with force operation not 
permitted by default")
+}
+var carbonCleanFilesLock: ICarbonLock = null
+try {
+  val errorMsg = "Clean files request is failed for " +
+s"${ carbonTable.getQualifiedName }" +
+". Not able to acquire the clean files lock due to another clean files 
" +
+"operation is running in the background."
+  carbonCleanFilesLock = 
CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+LockUsage.CLEAN_FILES_LOCK, errorMsg)
+  // step 1: check and clean trash folder
+  checkAndCleanTrashFolder(carbonTable, isForceDelete)
+  // step 2: move stale segments which are not exists in metadata into 
.Trash
+  moveStaleSegmentsToTrash(carbonTable)
+  // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In 
Progress)
+  cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, 
partitionSpecs)
+} finally {
+  if (carbonCleanFilesLock != null) {
+CarbonLockUtil.fileUnlock(carbonCleanFilesLock, 
LockUsage.CLEAN_FILES_LOCK)
+  }
+}
+  }
+
+  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, 
isForceDelete: Boolean): Unit = {
+if (isForceDelete) {
+  // empty the trash folder
+  TrashUtil.emptyTrash(carbonTable.getTablePath)
+} else {
+  // clear trash based on timestamp
+  TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+}
+  }
+
+  /**
+   * move stale segment to trash folder, but not include compaction segment
+   */
+  private def moveStaleSegmentsToTrash(carbonTable: CarbonTable): Unit = {
+if (carbonTable.isHivePartitionTable) {
+  

[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


vikramahuja1001 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537245787



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, 
LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDeleteclean the MFD/Compacted segments immediately 
and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on 
retention time,
+   * it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+  carbonTable: CarbonTable,
+  isForceDelete: Boolean,
+  cleanStaleInProgress: Boolean,
+  partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+// if isForceDelete = true need to throw exception if 
CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+if (isForceDelete && 
!CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+  LOGGER.error("Clean Files with Force option deletes the physical data 
and it cannot be" +
+" recovered. It is disabled by default, to enable clean files with 
force option," +
+" set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " 
to true")
+  throw new RuntimeException("Clean files with force operation not 
permitted by default")
+}
+var carbonCleanFilesLock: ICarbonLock = null
+try {
+  val errorMsg = "Clean files request is failed for " +
+s"${ carbonTable.getQualifiedName }" +
+". Not able to acquire the clean files lock due to another clean files 
" +
+"operation is running in the background."
+  carbonCleanFilesLock = 
CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+LockUsage.CLEAN_FILES_LOCK, errorMsg)
+  // step 1: check and clean trash folder
+  checkAndCleanTrashFolder(carbonTable, isForceDelete)
+  // step 2: move stale segments which are not exists in metadata into 
.Trash
+  moveStaleSegmentsToTrash(carbonTable)
+  // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In 
Progress)
+  cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, 
partitionSpecs)
+} finally {
+  if (carbonCleanFilesLock != null) {
+CarbonLockUtil.fileUnlock(carbonCleanFilesLock, 
LockUsage.CLEAN_FILES_LOCK)
+  }
+}
+  }
+
+  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, 
isForceDelete: Boolean): Unit = {
+if (isForceDelete) {
+  // empty the trash folder
+  TrashUtil.emptyTrash(carbonTable.getTablePath)
+} else {
+  // clear trash based on timestamp
+  TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+}
+  }
+
+  /**
+   * move stale segment to trash folder, but not include compaction segment
+   */
+  private def moveStaleSegmentsToTrash(carbonTable: CarbonTable): Unit = {
+if (carbonTable.isHivePartitionTable) {
+  

[GitHub] [carbondata] akashrn5 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


akashrn5 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537245017



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, 
LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDeleteclean the MFD/Compacted segments immediately 
and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on 
retention time,
+   * it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+  carbonTable: CarbonTable,
+  isForceDelete: Boolean,
+  cleanStaleInProgress: Boolean,
+  partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+// if isForceDelete = true need to throw exception if 
CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+if (isForceDelete && 
!CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+  LOGGER.error("Clean Files with Force option deletes the physical data 
and it cannot be" +
+" recovered. It is disabled by default, to enable clean files with 
force option," +
+" set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " 
to true")
+  throw new RuntimeException("Clean files with force operation not 
permitted by default")
+}
+var carbonCleanFilesLock: ICarbonLock = null
+try {
+  val errorMsg = "Clean files request is failed for " +
+s"${ carbonTable.getQualifiedName }" +
+". Not able to acquire the clean files lock due to another clean files 
" +
+"operation is running in the background."
+  carbonCleanFilesLock = 
CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+LockUsage.CLEAN_FILES_LOCK, errorMsg)
+  // step 1: check and clean trash folder
+  checkAndCleanTrashFolder(carbonTable, isForceDelete)
+  // step 2: move stale segments which are not exists in metadata into 
.Trash
+  moveStaleSegmentsToTrash(carbonTable)
+  // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In 
Progress)
+  cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, 
partitionSpecs)
+} finally {
+  if (carbonCleanFilesLock != null) {
+CarbonLockUtil.fileUnlock(carbonCleanFilesLock, 
LockUsage.CLEAN_FILES_LOCK)
+  }
+}
+  }
+
+  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, 
isForceDelete: Boolean): Unit = {
+if (isForceDelete) {
+  // empty the trash folder
+  TrashUtil.emptyTrash(carbonTable.getTablePath)
+} else {
+  // clear trash based on timestamp
+  TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+}
+  }
+
+  /**
+   * move stale segment to trash folder, but not include compaction segment
+   */
+  private def moveStaleSegmentsToTrash(carbonTable: CarbonTable): Unit = {
+if (carbonTable.isHivePartitionTable) {
+  

[GitHub] [carbondata] akashrn5 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


akashrn5 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537230858



##
File path: 
core/src/main/java/org/apache/carbondata/core/mutate/CarbonUpdateUtil.java
##
@@ -482,176 +482,6 @@ public boolean accept(CarbonFile file) {
 
   }
 
-  /**
-   * Handling of the clean up of old carbondata files, index files , delete 
delta,
-   * update status files.
-   * @param table clean up will be handled on this table.
-   * @param forceDelete if true then max query execution timeout will not be 
considered.
-   */
-  public static void cleanUpDeltaFiles(CarbonTable table, boolean forceDelete) 
throws IOException {
-
-SegmentStatusManager ssm = new 
SegmentStatusManager(table.getAbsoluteTableIdentifier());
-
-LoadMetadataDetails[] details =
-SegmentStatusManager.readLoadMetadata(table.getMetadataPath());
-
-SegmentUpdateStatusManager updateStatusManager = new 
SegmentUpdateStatusManager(table);
-SegmentUpdateDetails[] segmentUpdateDetails = 
updateStatusManager.getUpdateStatusDetails();
-// hold all the segments updated so that wen can check the delta files in 
them, ne need to
-// check the others.
-Set updatedSegments = new HashSet<>();
-for (SegmentUpdateDetails updateDetails : segmentUpdateDetails) {
-  updatedSegments.add(updateDetails.getSegmentName());
-}
-
-String validUpdateStatusFile = "";
-
-boolean isAbortedFile = true;
-
-boolean isInvalidFile = false;
-
-// take the update status file name from 0th segment.
-validUpdateStatusFile = ssm.getUpdateStatusFileName(details);
-// scan through each segment.
-for (LoadMetadataDetails segment : details) {
-  // if this segment is valid then only we will go for delta file deletion.
-  // if the segment is mark for delete or compacted then any way it will 
get deleted.
-  if (segment.getSegmentStatus() == SegmentStatus.SUCCESS
-  || segment.getSegmentStatus() == 
SegmentStatus.LOAD_PARTIAL_SUCCESS) {
-// when there is no update operations done on table, then no need to 
go ahead. So
-// just check the update delta start timestamp and proceed if not empty
-if (!segment.getUpdateDeltaStartTimestamp().isEmpty()
-|| updatedSegments.contains(segment.getLoadName())) {
-  // take the list of files from this segment.
-  String segmentPath = CarbonTablePath.getSegmentPath(
-  table.getAbsoluteTableIdentifier().getTablePath(), 
segment.getLoadName());
-  CarbonFile segDir =
-  FileFactory.getCarbonFile(segmentPath);
-  CarbonFile[] allSegmentFiles = segDir.listFiles();
-
-  // now handle all the delete delta files which needs to be deleted.
-  // there are 2 cases here .
-  // 1. if the block is marked as compacted then the corresponding 
delta files
-  //can be deleted if query exec timeout is done.
-  // 2. if the block is in success state then also there can be delete
-  //delta compaction happened and old files can be deleted.
-
-  SegmentUpdateDetails[] updateDetails = 
updateStatusManager.readLoadMetadata();
-  for (SegmentUpdateDetails block : updateDetails) {
-CarbonFile[] completeListOfDeleteDeltaFiles;
-CarbonFile[] invalidDeleteDeltaFiles;
-
-if 
(!block.getSegmentName().equalsIgnoreCase(segment.getLoadName())) {
-  continue;
-}
-
-// aborted scenario.
-invalidDeleteDeltaFiles = updateStatusManager
-.getDeleteDeltaInvalidFilesList(block, false,
-allSegmentFiles, isAbortedFile);
-for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
-  boolean doForceDelete = true;
-  compareTimestampsAndDelete(invalidFile, doForceDelete, false);
-}
-
-// case 1
-if (CarbonUpdateUtil.isBlockInvalid(block.getSegmentStatus())) {
-  completeListOfDeleteDeltaFiles = updateStatusManager
-  .getDeleteDeltaInvalidFilesList(block, true,
-  allSegmentFiles, isInvalidFile);
-  for (CarbonFile invalidFile : completeListOfDeleteDeltaFiles) {
-compareTimestampsAndDelete(invalidFile, forceDelete, false);
-  }
-
-} else {
-  invalidDeleteDeltaFiles = updateStatusManager
-  .getDeleteDeltaInvalidFilesList(block, false,
-  allSegmentFiles, isInvalidFile);
-  for (CarbonFile invalidFile : invalidDeleteDeltaFiles) {
-compareTimestampsAndDelete(invalidFile, forceDelete, false);
-  }
-}
-  }
-}
-// handle cleanup of merge index files and data files after small 
files merge happened for
-// SI table
-

[GitHub] [carbondata] ajantha-bhat commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


ajantha-bhat commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739671395


   @kunal642 , @vikramahuja1001 , @akashrn5 : I think we all 4 are reviewing 
this PR. Give comments once you find it. don't give all at the end. so that we 
can avoid duplicate comments. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] kunal642 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


kunal642 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537236952



##
File path: 
core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##
@@ -163,8 +164,13 @@ private static void getStaleSegmentFiles(CarbonTable 
carbonTable, List s
 }
 Set loadNameSet = 
Arrays.stream(details).map(LoadMetadataDetails::getLoadName)
 .collect(Collectors.toSet());
-List staleSegments = segmentFiles.stream().filter(segmentFile -> 
!loadNameSet.contains(
-
DataFileUtil.getSegmentNoFromSegmentFile(segmentFile))).collect(Collectors.toList());
+// get all stale segment files, not include compaction segments
+List staleSegments = segmentFiles.stream()
+.filter(segmentFile -> 
!DataFileUtil.getSegmentNoFromSegmentFile(segmentFile).contains(
+CarbonCommonConstants.POINT))
+.filter(segmentFile -> !loadNameSet.contains(

Review comment:
   Please combine both filters into one





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


vikramahuja1001 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537235165



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, 
LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDeleteclean the MFD/Compacted segments immediately 
and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on 
retention time,
+   * it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+  carbonTable: CarbonTable,
+  isForceDelete: Boolean,
+  cleanStaleInProgress: Boolean,
+  partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+// if isForceDelete = true need to throw exception if 
CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+if (isForceDelete && 
!CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+  LOGGER.error("Clean Files with Force option deletes the physical data 
and it cannot be" +
+" recovered. It is disabled by default, to enable clean files with 
force option," +
+" set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " 
to true")
+  throw new RuntimeException("Clean files with force operation not 
permitted by default")
+}
+var carbonCleanFilesLock: ICarbonLock = null
+try {
+  val errorMsg = "Clean files request is failed for " +
+s"${ carbonTable.getQualifiedName }" +
+". Not able to acquire the clean files lock due to another clean files 
" +
+"operation is running in the background."
+  carbonCleanFilesLock = 
CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+LockUsage.CLEAN_FILES_LOCK, errorMsg)
+  // step 1: check and clean trash folder
+  checkAndCleanTrashFolder(carbonTable, isForceDelete)
+  // step 2: move stale segments which are not exists in metadata into 
.Trash
+  moveStaleSegmentsToTrash(carbonTable)
+  // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In 
Progress)
+  cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, 
partitionSpecs)
+} finally {
+  if (carbonCleanFilesLock != null) {
+CarbonLockUtil.fileUnlock(carbonCleanFilesLock, 
LockUsage.CLEAN_FILES_LOCK)
+  }
+}
+  }
+
+  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, 
isForceDelete: Boolean): Unit = {
+if (isForceDelete) {
+  // empty the trash folder
+  TrashUtil.emptyTrash(carbonTable.getTablePath)
+} else {
+  // clear trash based on timestamp
+  TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+}
+  }
+
+  /**
+   * move stale segment to trash folder, but not include compaction segment
+   */
+  private def moveStaleSegmentsToTrash(carbonTable: CarbonTable): Unit = {
+if (carbonTable.isHivePartitionTable) {
+  

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4039: [WIP] Refactor and Fix Insert into partition issue with FileMergeSortComparator

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4039:
URL: https://github.com/apache/carbondata/pull/4039#issuecomment-739664975


   Build Failed  with Spark 2.4.5, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3313/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739664837


   Build Failed  with Spark 2.3.4, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5092/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


vikramahuja1001 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537232374



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/trash/DataTrashManager.scala
##
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.trash
+
+import scala.collection.JavaConverters._
+
+import org.apache.carbondata.common.logging.LogServiceFactory
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.carbondata.core.datastore.filesystem.{CarbonFile, 
CarbonFileFilter}
+import org.apache.carbondata.core.datastore.impl.FileFactory
+import org.apache.carbondata.core.indexstore.PartitionSpec
+import org.apache.carbondata.core.locks.{CarbonLockUtil, ICarbonLock, 
LockUsage}
+import org.apache.carbondata.core.metadata.SegmentFileStore
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable
+import org.apache.carbondata.core.statusmanager.SegmentStatusManager
+import org.apache.carbondata.core.util.{CarbonProperties, CarbonUtil, 
CleanFilesUtil, TrashUtil}
+import org.apache.carbondata.core.util.path.CarbonTablePath
+
+object DataTrashManager {
+  private val LOGGER = 
LogServiceFactory.getLogService(this.getClass.getCanonicalName)
+
+  /**
+   * clean garbage data
+   *  1. check and clean .Trash folder
+   *  2. move stale segments without metadata into .Trash
+   *  3. clean expired segments(MARKED_FOR_DELETE, Compacted, In Progress)
+   *
+   * @param isForceDeleteclean the MFD/Compacted segments immediately 
and empty trash folder
+   * @param cleanStaleInProgress clean the In Progress segments based on 
retention time,
+   * it will clean immediately when force is true
+   */
+  def cleanGarbageData(
+  carbonTable: CarbonTable,
+  isForceDelete: Boolean,
+  cleanStaleInProgress: Boolean,
+  partitionSpecs: Option[Seq[PartitionSpec]] = None): Unit = {
+// if isForceDelete = true need to throw exception if 
CARBON_CLEAN_FILES_FORCE_ALLOWED is false
+if (isForceDelete && 
!CarbonProperties.getInstance().isCleanFilesForceAllowed) {
+  LOGGER.error("Clean Files with Force option deletes the physical data 
and it cannot be" +
+" recovered. It is disabled by default, to enable clean files with 
force option," +
+" set " + CarbonCommonConstants.CARBON_CLEAN_FILES_FORCE_ALLOWED + " 
to true")
+  throw new RuntimeException("Clean files with force operation not 
permitted by default")
+}
+var carbonCleanFilesLock: ICarbonLock = null
+try {
+  val errorMsg = "Clean files request is failed for " +
+s"${ carbonTable.getQualifiedName }" +
+". Not able to acquire the clean files lock due to another clean files 
" +
+"operation is running in the background."
+  carbonCleanFilesLock = 
CarbonLockUtil.getLockObject(carbonTable.getAbsoluteTableIdentifier,
+LockUsage.CLEAN_FILES_LOCK, errorMsg)
+  // step 1: check and clean trash folder
+  checkAndCleanTrashFolder(carbonTable, isForceDelete)
+  // step 2: move stale segments which are not exists in metadata into 
.Trash
+  moveStaleSegmentsToTrash(carbonTable)
+  // step 3: clean expired segments(MARKED_FOR_DELETE, Compacted, In 
Progress)
+  cleanExpiredSegments(carbonTable, isForceDelete, cleanStaleInProgress, 
partitionSpecs)
+} finally {
+  if (carbonCleanFilesLock != null) {
+CarbonLockUtil.fileUnlock(carbonCleanFilesLock, 
LockUsage.CLEAN_FILES_LOCK)
+  }
+}
+  }
+
+  private def checkAndCleanTrashFolder(carbonTable: CarbonTable, 
isForceDelete: Boolean): Unit = {
+if (isForceDelete) {
+  // empty the trash folder
+  TrashUtil.emptyTrash(carbonTable.getTablePath)
+} else {
+  // clear trash based on timestamp
+  TrashUtil.deleteExpiredDataFromTrash(carbonTable.getTablePath)
+}
+  }
+
+  /**
+   * move stale segment to trash folder, but not include compaction segment
+   */
+  private def moveStaleSegmentsToTrash(carbonTable: CarbonTable): Unit = {
+if (carbonTable.isHivePartitionTable) {
+  

[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739664259


   Build Failed  with Spark 2.4.5, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3311/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


vikramahuja1001 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537230866



##
File path: 
integration/spark/src/main/scala/org/apache/carbondata/api/CarbonStore.scala
##
@@ -19,13 +19,13 @@ package org.apache.carbondata.api
 
 import java.io.{DataInputStream, FileNotFoundException, InputStreamReader}
 import java.time.{Duration, Instant}
-import java.util
 import java.util.{Collections, Comparator}
 
 import scala.collection.JavaConverters._
 import scala.util.control.Breaks.{break, breakable}
 
 import com.google.gson.Gson
+import java.util

Review comment:
   why this change? I think CI will fail because of this change





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] vikramahuja1001 commented on a change in pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


vikramahuja1001 commented on a change in pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#discussion_r537230288



##
File path: 
core/src/main/java/org/apache/carbondata/core/util/CleanFilesUtil.java
##
@@ -163,8 +164,13 @@ private static void getStaleSegmentFiles(CarbonTable 
carbonTable, List s
 }
 Set loadNameSet = 
Arrays.stream(details).map(LoadMetadataDetails::getLoadName)
 .collect(Collectors.toSet());
-List staleSegments = segmentFiles.stream().filter(segmentFile -> 
!loadNameSet.contains(
-
DataFileUtil.getSegmentNoFromSegmentFile(segmentFile))).collect(Collectors.toList());
+// get all stale segment files, not include compaction segments

Review comment:
   we don't want to send compacted segments to the trash folder?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] Indhumathi27 commented on pull request #4039: [WIP] Refactor and Fix Insert into partition issue with FileMergeSortComparator

2020-12-06 Thread GitBox


Indhumathi27 commented on pull request #4039:
URL: https://github.com/apache/carbondata/pull/4039#issuecomment-739660102


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739649949


   Build Failed  with Spark 2.3.4, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5071/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4032: [WIP][CARBONDATA-4065] Support MERGE INTO SQL Command

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4032:
URL: https://github.com/apache/carbondata/pull/4032#issuecomment-739649832


   Build Failed  with Spark 2.4.5, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3312/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4012: [CARBONDATA-4051] Geo spatial index algorithm improvement and UDFs enhancement

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4012:
URL: https://github.com/apache/carbondata/pull/4012#issuecomment-739649371


   Build Success with Spark 2.4.5, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3308/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4040: [WIP][CI TEST]

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4040:
URL: https://github.com/apache/carbondata/pull/4040#issuecomment-739649028


   Build Failed  with Spark 2.3.4, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5070/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4040: [WIP][CI TEST]

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4040:
URL: https://github.com/apache/carbondata/pull/4040#issuecomment-739648939


   Build Failed  with Spark 2.3.4, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5091/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4040: [WIP][CI TEST]

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4040:
URL: https://github.com/apache/carbondata/pull/4040#issuecomment-739648788


   Build Failed  with Spark 2.4.5, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3335/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4029: [HOTFIX] Refact Carbon Util

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4029:
URL: https://github.com/apache/carbondata/pull/4029#issuecomment-739648475


   Build Success with Spark 2.3.4, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5064/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4040: [WIP][CI TEST]

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4040:
URL: https://github.com/apache/carbondata/pull/4040#issuecomment-739648469


   Build Failed  with Spark 2.4.5, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3330/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739648471


   Build Failed  with Spark 2.4.5, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3334/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4012: [CARBONDATA-4051] Geo spatial index algorithm improvement and UDFs enhancement

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4012:
URL: https://github.com/apache/carbondata/pull/4012#issuecomment-739648470


   Build Success with Spark 2.3.4, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5065/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] QiangCai commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


QiangCai commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739642960


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4040: [WIP][CI TEST]

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4040:
URL: https://github.com/apache/carbondata/pull/4040#issuecomment-739635308


   Build Failed  with Spark 2.3.4, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5067/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4040: [WIP][CI TEST]

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4040:
URL: https://github.com/apache/carbondata/pull/4040#issuecomment-739635257


   Build Failed  with Spark 2.4.5, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3310/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739635141







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4029: [HOTFIX] Refact Carbon Util

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4029:
URL: https://github.com/apache/carbondata/pull/4029#issuecomment-739634371


   Build Failed  with Spark 2.4.5, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3307/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4044: [CARBONDATA-4062] Refactor clean files feature

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4044:
URL: https://github.com/apache/carbondata/pull/4044#issuecomment-739634201







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4040: [WIP][CI TEST]

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4040:
URL: https://github.com/apache/carbondata/pull/4040#issuecomment-739634188


   Build Failed  with Spark 2.3.4, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5068/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4043: IUD Concurrency Improvement

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4043:
URL: https://github.com/apache/carbondata/pull/4043#issuecomment-739634097


   Build Failed  with Spark 2.3.4, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5062/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] Zhangshunyu commented on pull request #4040: [WIP][CI TEST]

2020-12-06 Thread GitBox


Zhangshunyu commented on pull request #4040:
URL: https://github.com/apache/carbondata/pull/4040#issuecomment-739621621


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4040: [WIP][CI TEST]

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4040:
URL: https://github.com/apache/carbondata/pull/4040#issuecomment-739620979


   Build Failed  with Spark 2.3.4, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5087/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4029: [HOTFIX] Refact Carbon Util

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4029:
URL: https://github.com/apache/carbondata/pull/4029#issuecomment-739620953


   Build Failed  with Spark 2.3.4, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbonPRBuilder2.3/5088/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4043: IUD Concurrency Improvement

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4043:
URL: https://github.com/apache/carbondata/pull/4043#issuecomment-739620866







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4012: [CARBONDATA-4051] Geo spatial index algorithm improvement and UDFs enhancement

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4012:
URL: https://github.com/apache/carbondata/pull/4012#issuecomment-739620889







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4040: [WIP][CI TEST]

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4040:
URL: https://github.com/apache/carbondata/pull/4040#issuecomment-739620657


   Build Failed  with Spark 2.4.5, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3306/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] CarbonDataQA2 commented on pull request #4029: [HOTFIX] Refact Carbon Util

2020-12-06 Thread GitBox


CarbonDataQA2 commented on pull request #4029:
URL: https://github.com/apache/carbondata/pull/4029#issuecomment-739620454


   Build Failed  with Spark 2.4.5, Please check CI 
http://121.244.95.60:12545/job/ApacheCarbon_PR_Builder_2.4.5/3331/
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [carbondata] shenjiayu17 commented on a change in pull request #4012: [CARBONDATA-4051] Geo spatial index algorithm improvement and UDFs enhancement

2020-12-06 Thread GitBox


shenjiayu17 commented on a change in pull request #4012:
URL: https://github.com/apache/carbondata/pull/4012#discussion_r533974578



##
File path: 
geo/src/main/java/org/apache/carbondata/geo/scan/expression/PolylineListExpression.java
##
@@ -0,0 +1,202 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.geo.scan.expression;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import org.apache.carbondata.common.annotations.InterfaceAudience;
+import org.apache.carbondata.core.datastore.block.SegmentProperties;
+import org.apache.carbondata.core.metadata.datatype.DataTypes;
+import org.apache.carbondata.core.scan.expression.ColumnExpression;
+import org.apache.carbondata.core.scan.expression.Expression;
+import org.apache.carbondata.core.scan.expression.ExpressionResult;
+import org.apache.carbondata.core.scan.expression.UnknownExpression;
+import 
org.apache.carbondata.core.scan.expression.conditional.ConditionalExpression;
+import org.apache.carbondata.core.scan.filter.executer.FilterExecutor;
+import org.apache.carbondata.core.scan.filter.intf.ExpressionType;
+import org.apache.carbondata.core.scan.filter.intf.RowIntf;
+import org.apache.carbondata.core.scan.filter.resolver.FilterResolverIntf;
+import 
org.apache.carbondata.core.scan.filter.resolver.RowLevelFilterResolverImpl;
+import org.apache.carbondata.core.util.CustomIndex;
+import org.apache.carbondata.geo.GeoConstants;
+import org.apache.carbondata.geo.GeoHashIndex;
+import org.apache.carbondata.geo.GeoHashUtils;
+import org.apache.carbondata.geo.GeoOperationType;
+import 
org.apache.carbondata.geo.scan.filter.executor.PolygonFilterExecutorImpl;
+
+import org.locationtech.jts.geom.Coordinate;
+import org.locationtech.jts.geom.Geometry;
+import org.locationtech.jts.geom.GeometryFactory;
+import org.locationtech.jts.geom.LineString;
+import org.locationtech.jts.geom.Polygon;
+import org.locationtech.jts.io.WKTReader;
+import org.locationtech.jts.operation.buffer.BufferParameters;
+
+/**
+ * InPolylineList expression processor. It inputs the InPolylineList string to 
the Geo
+ * implementation's query method, gets a list of range of IDs from each 
polygon and
+ * calculates the and/or/diff range list to filter as an output. And then, 
build
+ * InExpression with list of all the IDs present in those list of ranges.
+ */
+@InterfaceAudience.Internal
+public class PolylineListExpression extends UnknownExpression
+implements ConditionalExpression {
+
+  private static final GeometryFactory geoFactory = new GeometryFactory();
+
+  private String polylineString;
+
+  private Float bufferInMeter;
+
+  private GeoHashIndex instance;
+
+  private List ranges = new ArrayList();
+
+  private ColumnExpression column;
+
+  private static final ExpressionResult trueExpRes =
+  new ExpressionResult(DataTypes.BOOLEAN, true);
+
+  private static final ExpressionResult falseExpRes =
+  new ExpressionResult(DataTypes.BOOLEAN, false);
+
+  public PolylineListExpression(String polylineString, Float bufferInMeter, 
String columnName,
+  CustomIndex indexInstance) {
+this.polylineString = polylineString;
+this.bufferInMeter = bufferInMeter;
+this.instance = (GeoHashIndex) indexInstance;
+this.column = new ColumnExpression(columnName, DataTypes.LONG);
+  }
+
+  private void processExpression() {
+try {
+  // transform the distance unit meter to degree
+  double buffer = bufferInMeter / 
GeoConstants.CONVERSION_FACTOR_OF_METER_TO_DEGREE;
+
+  // 1. parse the polyline list string and get polygon from each polyline
+  List polygonList = new ArrayList<>();
+  WKTReader wktReader = new WKTReader();
+  Pattern pattern = Pattern.compile(GeoConstants.POLYLINE_REG_EXPRESSION);
+  Matcher matcher = pattern.matcher(polylineString);
+  while (matcher.find()) {
+String matchedStr = matcher.group();
+LineString polylineCreatedFromStr = (LineString) 
wktReader.read(matchedStr);
+Polygon polygonFromPolylineBuffer = (Polygon) 
polylineCreatedFromStr.buffer(
+buffer, 0, 

  1   2   >