Repository: incubator-carbondata Updated Branches: refs/heads/master ada081d89 -> 4a7adfa97
Changes done: 1. Clean up of folders created locally during data load and insert into operations. 2. Setting the load status properly for success, partial success and failure cases. 3. Printing load statistics in case of success and partial success. Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/487e41dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/487e41dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/487e41dd Branch: refs/heads/master Commit: 487e41ddd13d9e19813cc1b9c4eda73376f1c8ba Parents: ada081d Author: manishgupta88 <tomanishgupt...@gmail.com> Authored: Wed Apr 5 19:04:14 2017 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Apr 6 10:33:07 2017 +0530 ---------------------------------------------------------------------- .../carbondata/spark/load/CarbonLoaderUtil.java | 30 ++++++++++++++++---- .../spark/rdd/NewCarbonDataLoadRDD.scala | 30 ++++++++++++++++---- 2 files changed, 49 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/487e41dd/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java index cc16398..95f0b10 100644 --- a/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java +++ b/integration/spark-common/src/main/java/org/apache/carbondata/spark/load/CarbonLoaderUtil.java @@ -36,6 +36,9 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; @@ -171,13 +174,30 @@ public final class CarbonLoaderUtil { tempLocationKey = CarbonCommonConstants.COMPACTION_KEY_WORD + '_' + tempLocationKey; } // form local store location - String localStoreLocation = CarbonProperties.getInstance() + final String localStoreLocation = CarbonProperties.getInstance() .getProperty(tempLocationKey, CarbonCommonConstants.STORE_LOCATION_DEFAULT_VAL); + // submit local folder clean up in another thread so that main thread execution is not blocked + ExecutorService localFolderDeletionService = Executors.newFixedThreadPool(1); try { - CarbonUtil.deleteFoldersAndFiles(new File(localStoreLocation).getParentFile()); - LOGGER.info("Deleted the local store location" + localStoreLocation); - } catch (IOException | InterruptedException e) { - LOGGER.error(e, "Failed to delete local data load folder location"); + localFolderDeletionService.submit(new Callable<Void>() { + @Override public Void call() throws Exception { + try { + long startTime = System.currentTimeMillis(); + File file = new File(localStoreLocation); + CarbonUtil.deleteFoldersAndFiles(file.getParentFile()); + LOGGER.info( + "Deleted the local store location" + localStoreLocation + " : TIme taken: " + ( + System.currentTimeMillis() - startTime)); + } catch (IOException | InterruptedException e) { + LOGGER.error(e, "Failed to delete local data load folder location"); + } + return null; + } + }); + } finally { + if (null != localFolderDeletionService) { + localFolderDeletionService.shutdown(); + } } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/487e41dd/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 0690ba1..72ee90f 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -223,7 +223,7 @@ class NewCarbonDataLoadRDD[K, V]( carbonLoadModel.getTableName + CarbonCommonConstants.UNDERSCORE + theSplit.index try { loadMetadataDetails.setPartitionCount(partitionID) - loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) + loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS) carbonLoadModel.setSegmentId(String.valueOf(loadCount)) val preFetch = CarbonProperties.getInstance().getProperty(CarbonCommonConstants @@ -237,7 +237,6 @@ class NewCarbonDataLoadRDD[K, V]( loadMetadataDetails) // Intialize to set carbon properties loader.initialize() - loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS) new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders) @@ -246,9 +245,20 @@ class NewCarbonDataLoadRDD[K, V]( loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) logInfo("Bad Record Found") case e: Exception => + loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) logInfo("DataLoad failure", e) LOGGER.error(e) throw e + } finally { + // clean up the folders and files created locally for data load operation + CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false) + // in case of failure the same operation will be re-tried several times. + // So print the data load statistics only in case of non failure case + if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE + .equals(loadMetadataDetails.getLoadStatus)) { + CarbonTimeStatisticsFactory.getLoadStatisticsInstance + .printStatisticsInfo(model.getPartitionId) + } } def getInputIterators: Array[CarbonIterator[Array[AnyRef]]] = { @@ -389,7 +399,7 @@ class NewDataFrameLoaderRDD[K, V]( try { loadMetadataDetails.setPartitionCount(partitionID) - loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) + loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS) carbonLoadModel.setPartitionId(partitionID) carbonLoadModel.setSegmentId(String.valueOf(loadCount)) carbonLoadModel.setTaskNo(String.valueOf(theSplit.index)) @@ -420,18 +430,26 @@ class NewDataFrameLoaderRDD[K, V]( loadMetadataDetails) // Intialize to set carbon properties loader.initialize() - - loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_SUCCESS) new DataLoadExecutor().execute(model, loader.storeLocation, recordReaders.toArray) - } catch { case e: BadRecordFoundException => loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_PARTIAL_SUCCESS) logInfo("Bad Record Found") case e: Exception => + loadMetadataDetails.setLoadStatus(CarbonCommonConstants.STORE_LOADSTATUS_FAILURE) logInfo("DataLoad failure", e) LOGGER.error(e) throw e + } finally { + // clean up the folders and files created locally for data load operation + CarbonLoaderUtil.deleteLocalDataLoadFolderLocation(model, false) + // in case of failure the same operation will be re-tried several times. + // So print the data load statistics only in case of non failure case + if (!CarbonCommonConstants.STORE_LOADSTATUS_FAILURE + .equals(loadMetadataDetails.getLoadStatus)) { + CarbonTimeStatisticsFactory.getLoadStatisticsInstance + .printStatisticsInfo(model.getPartitionId) + } } var finished = false