[CARBONDATA-2817]Thread Leak in Update and in No sort flow Issue :- After Update Command is finished , Loading threads are not getting stopped.
Root Cause :- In Update flow DataLoadExecutor 's close method is not called so all Executors services are not closed. In Exceptions are not handled property in AFDW class's closeExecutorService() which is cuasing Thread leak if Job is killed from SparkUI.. Solution :- Add Task Completion Listener and call close method of DataLoadExecutor to it . Handle Exception in closeExecutor Service so that all Writer steps Threads can be closed. This closes #2606 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/eb01af16 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/eb01af16 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/eb01af16 Branch: refs/heads/branch-1.4 Commit: eb01af1630d60eb79ef82c469406e6311a246d7b Parents: 7598dea Author: BJangir <babulaljangir...@gmail.com> Authored: Thu Aug 2 21:51:07 2018 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Aug 9 23:51:36 2018 +0530 ---------------------------------------------------------------------- .../core/util/BlockletDataMapUtil.java | 4 +- .../carbondata/spark/rdd/UpdateDataLoad.scala | 9 +++- .../CarbonRowDataWriterProcessorStepImpl.java | 52 +++++++++++++++++--- .../steps/DataWriterBatchProcessorStepImpl.java | 25 ++++++++-- .../store/writer/AbstractFactDataWriter.java | 16 ++++-- .../writer/v3/CarbonFactDataWriterImplV3.java | 19 +++++-- 6 files changed, 103 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb01af16/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java index 68ce1fb..404b426 100644 --- a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java +++ b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java @@ -115,7 +115,7 @@ public class BlockletDataMapUtil { CarbonTable.updateTableByTableInfo(carbonTable, carbonTable.getTableInfo()); } String blockPath = footer.getBlockInfo().getTableBlockInfo().getFilePath(); - if (null != fileNameToMetaInfoMapping && null == blockMetaInfoMap.get(blockPath)) { + if (null == blockMetaInfoMap.get(blockPath)) { BlockMetaInfo blockMetaInfo = createBlockMetaInfo(fileNameToMetaInfoMapping, blockPath); // if blockMetaInfo is null that means the file has been deleted from the file system. // This can happen in case IUD scenarios where after deleting or updating the data the @@ -123,8 +123,6 @@ public class BlockletDataMapUtil { if (null != blockMetaInfo) { blockMetaInfoMap.put(blockPath, blockMetaInfo); } - } else { - blockMetaInfoMap.put(blockPath, new BlockMetaInfo(new String[] {},0)); } } return blockMetaInfoMap; http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb01af16/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala index 2e7c307..f4fdbc1 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala @@ -25,8 +25,10 @@ import org.apache.spark.sql.Row import org.apache.carbondata.common.CarbonIterator import org.apache.carbondata.common.logging.LogServiceFactory import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus} +import org.apache.carbondata.core.util.ThreadLocalTaskInfo import org.apache.carbondata.processing.loading.{DataLoadExecutor, TableProcessingOperations} import org.apache.carbondata.processing.loading.model.CarbonLoadModel +import org.apache.carbondata.spark.util.CommonUtil /** * Data load in case of update command . @@ -54,7 +56,12 @@ object UpdateDataLoad { loader.initialize() loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS) - new DataLoadExecutor().execute(carbonLoadModel, + val executor = new DataLoadExecutor + TaskContext.get().addTaskCompletionListener { context => + executor.close() + CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId) + } + executor.execute(carbonLoadModel, loader.storeLocation, recordReaders.toArray) http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb01af16/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java index 1a05b12..ac13d24 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java @@ -18,7 +18,9 @@ package org.apache.carbondata.processing.loading.steps; import java.io.IOException; import java.util.Iterator; +import java.util.List; import java.util.Map; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; @@ -80,11 +82,16 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces private Map<String, LocalDictionaryGenerator> localDictionaryGeneratorMap; + private List<CarbonFactHandler> carbonFactHandlers; + + private ExecutorService executorService = null; + public CarbonRowDataWriterProcessorStepImpl(CarbonDataLoadConfiguration configuration, AbstractDataLoadProcessorStep child) { super(configuration, child); this.localDictionaryGeneratorMap = CarbonUtil.getLocalDictionaryModel(configuration.getTableSpec().getCarbonTable()); + this.carbonFactHandlers = new CopyOnWriteArrayList<>(); } @Override public void initialize() throws IOException { @@ -107,7 +114,6 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces final Iterator<CarbonRowBatch>[] iterators = child.execute(); tableIdentifier = configuration.getTableIdentifier().getCarbonTableIdentifier(); tableName = tableIdentifier.getTableName(); - ExecutorService executorService = null; try { readCounter = new long[iterators.length]; writeCounter = new long[iterators.length]; @@ -149,10 +155,6 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces throw new BadRecordFoundException(e.getMessage(), e); } throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage(), e); - } finally { - if (null != executorService && executorService.isShutdown()) { - executorService.shutdownNow(); - } } return null; } @@ -169,13 +171,20 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces if (rowsNotExist) { rowsNotExist = false; dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(model); + this.carbonFactHandlers.add(dataHandler); dataHandler.initialise(); } processBatch(iterator.next(), dataHandler, iteratorIndex); } - if (!rowsNotExist) { - finish(dataHandler, iteratorIndex); + try { + if (!rowsNotExist) { + finish(dataHandler, iteratorIndex); + } + } finally { + carbonFactHandlers.remove(dataHandler); } + + } @Override protected String getStepName() { @@ -183,10 +192,15 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces } private void finish(CarbonFactHandler dataHandler, int iteratorIndex) { + CarbonDataWriterException exception = null; try { dataHandler.finish(); } catch (Exception e) { + // if throw exception from here dataHandler will not be closed. + // so just holding exception and later throwing exception LOGGER.error(e, "Failed for table: " + tableName + " in finishing data handler"); + exception = new CarbonDataWriterException( + "Failed for table: " + tableName + " in finishing data handler", e); } LOGGER.info("Record Processed For table: " + tableName); String logMessage = @@ -194,13 +208,20 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces + ": Write: " + readCounter[iteratorIndex]; LOGGER.info(logMessage); CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get()); - processingComplete(dataHandler); + try { + processingComplete(dataHandler); + } catch (CarbonDataLoadingException e) { + exception = new CarbonDataWriterException(e.getMessage(), e); + } CarbonTimeStatisticsFactory.getLoadStatisticsInstance() .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID, System.currentTimeMillis()); CarbonTimeStatisticsFactory.getLoadStatisticsInstance() .recordMdkGenerateTotalTime(CarbonTablePath.DEPRECATED_PATITION_ID, System.currentTimeMillis()); + if (null != exception) { + throw exception; + } } private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException { @@ -310,4 +331,19 @@ public class CarbonRowDataWriterProcessorStepImpl extends AbstractDataLoadProces } } } + + @Override public void close() { + if (!closed) { + super.close(); + if (null != executorService) { + executorService.shutdownNow(); + } + if (null != this.carbonFactHandlers && !this.carbonFactHandlers.isEmpty()) { + for (CarbonFactHandler carbonFactHandler : this.carbonFactHandlers) { + carbonFactHandler.finish(); + carbonFactHandler.closeHandler(); + } + } + } + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb01af16/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java index 5663811..26ae2d7 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java @@ -22,6 +22,7 @@ import java.util.Map; import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; +import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; import org.apache.carbondata.core.datastore.row.CarbonRow; import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; @@ -98,8 +99,14 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS .createCarbonFactHandler(model); carbonFactHandler.initialise(); processBatch(next, carbonFactHandler); - finish(tableName, carbonFactHandler); - this.carbonFactHandler = null; + try { + finish(tableName, carbonFactHandler); + } finally { + // we need to make carbonFactHandler =null as finish will call closehandler + // even finish throws exception + // otherwise close() will call finish method again for same handler. + this.carbonFactHandler = null; + } } } i++; @@ -119,19 +126,31 @@ public class DataWriterBatchProcessorStepImpl extends AbstractDataLoadProcessorS } private void finish(String tableName, CarbonFactHandler dataHandler) { + CarbonDataWriterException exception = null; try { dataHandler.finish(); } catch (Exception e) { + // if throw exception from here dataHandler will not be closed. + // so just holding exception and later throwing exception LOGGER.error(e, "Failed for table: " + tableName + " in finishing data handler"); + exception = new CarbonDataWriterException( + "Failed for table: " + tableName + " in finishing data handler", e); } CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get()); - processingComplete(dataHandler); + try { + processingComplete(dataHandler); + } catch (Exception e) { + exception = new CarbonDataWriterException(e.getMessage(), e); + } CarbonTimeStatisticsFactory.getLoadStatisticsInstance() .recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID, System.currentTimeMillis()); CarbonTimeStatisticsFactory.getLoadStatisticsInstance() .recordMdkGenerateTotalTime(CarbonTablePath.DEPRECATED_PATITION_ID, System.currentTimeMillis()); + if (null != exception) { + throw exception; + } } private void processingComplete(CarbonFactHandler dataHandler) { http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb01af16/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java index 3e71e45..836e2c8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java @@ -415,20 +415,30 @@ public abstract class AbstractFactDataWriter implements CarbonFactDataWriter { * @throws CarbonDataWriterException */ protected void closeExecutorService() throws CarbonDataWriterException { + CarbonDataWriterException exception = null; try { listener.finish(); + listener = null; + } catch (IOException e) { + exception = new CarbonDataWriterException(e); + } + try { executorService.shutdown(); executorService.awaitTermination(2, TimeUnit.HOURS); for (int i = 0; i < executorServiceSubmitList.size(); i++) { executorServiceSubmitList.get(i).get(); } - listener = null; - } catch (InterruptedException | ExecutionException | IOException e) { - throw new CarbonDataWriterException(e); + } catch (InterruptedException | ExecutionException e) { + if (null == exception) { + exception = new CarbonDataWriterException(e); + } } if (null != fallbackExecutorService) { fallbackExecutorService.shutdownNow(); } + if (exception != null) { + throw exception; + } } http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb01af16/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java index dc6e443..f992e44 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java @@ -350,14 +350,25 @@ public class CarbonFactDataWriterImplV3 extends AbstractFactDataWriter { * @throws CarbonDataWriterException */ public void closeWriter() throws CarbonDataWriterException { - commitCurrentFile(true); + CarbonDataWriterException exception = null; try { + commitCurrentFile(true); writeIndexFile(); - } catch (IOException e) { + } catch (Exception e) { LOGGER.error(e, "Problem while writing the index file"); - throw new CarbonDataWriterException("Problem while writing the index file", e); + exception = new CarbonDataWriterException("Problem while writing the index file", e); + } finally { + try { + closeExecutorService(); + } catch (CarbonDataWriterException e) { + if (null == exception) { + exception = e; + } + } + } + if (null != exception) { + throw exception; } - closeExecutorService(); } @Override public void writeFooterToFile() throws CarbonDataWriterException {