[CARBONDATA-2817]Thread Leak in Update and in No sort flow

Issue :- After Update Command is finished , Loading threads are not getting 
stopped.

Root Cause :-

In Update flow DataLoadExecutor 's close method is not called so all Executors 
services are not closed.
In Exceptions are not handled property in AFDW class's closeExecutorService() 
which is cuasing Thread leak if Job is killed from SparkUI..
Solution :-

Add Task Completion Listener and call close method of DataLoadExecutor to it .
Handle Exception in closeExecutor Service so that all Writer steps Threads can 
be closed.

This closes #2606


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/eb01af16
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/eb01af16
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/eb01af16

Branch: refs/heads/branch-1.4
Commit: eb01af1630d60eb79ef82c469406e6311a246d7b
Parents: 7598dea
Author: BJangir <babulaljangir...@gmail.com>
Authored: Thu Aug 2 21:51:07 2018 +0530
Committer: ravipesala <ravi.pes...@gmail.com>
Committed: Thu Aug 9 23:51:36 2018 +0530

----------------------------------------------------------------------
 .../core/util/BlockletDataMapUtil.java          |  4 +-
 .../carbondata/spark/rdd/UpdateDataLoad.scala   |  9 +++-
 .../CarbonRowDataWriterProcessorStepImpl.java   | 52 +++++++++++++++++---
 .../steps/DataWriterBatchProcessorStepImpl.java | 25 ++++++++--
 .../store/writer/AbstractFactDataWriter.java    | 16 ++++--
 .../writer/v3/CarbonFactDataWriterImplV3.java   | 19 +++++--
 6 files changed, 103 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb01af16/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java 
b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
index 68ce1fb..404b426 100644
--- 
a/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
+++ 
b/core/src/main/java/org/apache/carbondata/core/util/BlockletDataMapUtil.java
@@ -115,7 +115,7 @@ public class BlockletDataMapUtil {
         CarbonTable.updateTableByTableInfo(carbonTable, 
carbonTable.getTableInfo());
       }
       String blockPath = 
footer.getBlockInfo().getTableBlockInfo().getFilePath();
-      if (null != fileNameToMetaInfoMapping && null == 
blockMetaInfoMap.get(blockPath)) {
+      if (null == blockMetaInfoMap.get(blockPath)) {
         BlockMetaInfo blockMetaInfo = 
createBlockMetaInfo(fileNameToMetaInfoMapping, blockPath);
         // if blockMetaInfo is null that means the file has been deleted from 
the file system.
         // This can happen in case IUD scenarios where after deleting or 
updating the data the
@@ -123,8 +123,6 @@ public class BlockletDataMapUtil {
         if (null != blockMetaInfo) {
           blockMetaInfoMap.put(blockPath, blockMetaInfo);
         }
-      } else {
-        blockMetaInfoMap.put(blockPath, new BlockMetaInfo(new String[] {},0));
       }
     }
     return blockMetaInfoMap;

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb01af16/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
----------------------------------------------------------------------
diff --git 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
index 2e7c307..f4fdbc1 100644
--- 
a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
+++ 
b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/UpdateDataLoad.scala
@@ -25,8 +25,10 @@ import org.apache.spark.sql.Row
 import org.apache.carbondata.common.CarbonIterator
 import org.apache.carbondata.common.logging.LogServiceFactory
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, 
SegmentStatus}
+import org.apache.carbondata.core.util.ThreadLocalTaskInfo
 import org.apache.carbondata.processing.loading.{DataLoadExecutor, 
TableProcessingOperations}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
+import org.apache.carbondata.spark.util.CommonUtil
 
 /**
  * Data load in case of update command .
@@ -54,7 +56,12 @@ object UpdateDataLoad {
       loader.initialize()
 
       loadMetadataDetails.setSegmentStatus(SegmentStatus.SUCCESS)
-      new DataLoadExecutor().execute(carbonLoadModel,
+      val executor = new DataLoadExecutor
+      TaskContext.get().addTaskCompletionListener { context =>
+        executor.close()
+        
CommonUtil.clearUnsafeMemory(ThreadLocalTaskInfo.getCarbonTaskInfo.getTaskId)
+      }
+      executor.execute(carbonLoadModel,
         loader.storeLocation,
         recordReaders.toArray)
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb01af16/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
index 1a05b12..ac13d24 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/CarbonRowDataWriterProcessorStepImpl.java
@@ -18,7 +18,9 @@ package org.apache.carbondata.processing.loading.steps;
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -80,11 +82,16 @@ public class CarbonRowDataWriterProcessorStepImpl extends 
AbstractDataLoadProces
 
   private Map<String, LocalDictionaryGenerator> localDictionaryGeneratorMap;
 
+  private List<CarbonFactHandler> carbonFactHandlers;
+
+  private ExecutorService executorService = null;
+
   public CarbonRowDataWriterProcessorStepImpl(CarbonDataLoadConfiguration 
configuration,
       AbstractDataLoadProcessorStep child) {
     super(configuration, child);
     this.localDictionaryGeneratorMap =
         
CarbonUtil.getLocalDictionaryModel(configuration.getTableSpec().getCarbonTable());
+    this.carbonFactHandlers = new CopyOnWriteArrayList<>();
   }
 
   @Override public void initialize() throws IOException {
@@ -107,7 +114,6 @@ public class CarbonRowDataWriterProcessorStepImpl extends 
AbstractDataLoadProces
     final Iterator<CarbonRowBatch>[] iterators = child.execute();
     tableIdentifier = 
configuration.getTableIdentifier().getCarbonTableIdentifier();
     tableName = tableIdentifier.getTableName();
-    ExecutorService executorService = null;
     try {
       readCounter = new long[iterators.length];
       writeCounter = new long[iterators.length];
@@ -149,10 +155,6 @@ public class CarbonRowDataWriterProcessorStepImpl extends 
AbstractDataLoadProces
         throw new BadRecordFoundException(e.getMessage(), e);
       }
       throw new CarbonDataLoadingException("There is an unexpected error: " + 
e.getMessage(), e);
-    } finally {
-      if (null != executorService && executorService.isShutdown()) {
-        executorService.shutdownNow();
-      }
     }
     return null;
   }
@@ -169,13 +171,20 @@ public class CarbonRowDataWriterProcessorStepImpl extends 
AbstractDataLoadProces
       if (rowsNotExist) {
         rowsNotExist = false;
         dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(model);
+        this.carbonFactHandlers.add(dataHandler);
         dataHandler.initialise();
       }
       processBatch(iterator.next(), dataHandler, iteratorIndex);
     }
-    if (!rowsNotExist) {
-      finish(dataHandler, iteratorIndex);
+    try {
+      if (!rowsNotExist) {
+        finish(dataHandler, iteratorIndex);
+      }
+    } finally {
+      carbonFactHandlers.remove(dataHandler);
     }
+
+
   }
 
   @Override protected String getStepName() {
@@ -183,10 +192,15 @@ public class CarbonRowDataWriterProcessorStepImpl extends 
AbstractDataLoadProces
   }
 
   private void finish(CarbonFactHandler dataHandler, int iteratorIndex) {
+    CarbonDataWriterException exception = null;
     try {
       dataHandler.finish();
     } catch (Exception e) {
+      // if throw exception from here dataHandler will not be closed.
+      // so just holding exception and later throwing exception
       LOGGER.error(e, "Failed for table: " + tableName + " in  finishing data 
handler");
+      exception = new CarbonDataWriterException(
+          "Failed for table: " + tableName + " in  finishing data handler", e);
     }
     LOGGER.info("Record Processed For table: " + tableName);
     String logMessage =
@@ -194,13 +208,20 @@ public class CarbonRowDataWriterProcessorStepImpl extends 
AbstractDataLoadProces
             + ": Write: " + readCounter[iteratorIndex];
     LOGGER.info(logMessage);
     
CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
-    processingComplete(dataHandler);
+    try {
+      processingComplete(dataHandler);
+    } catch (CarbonDataLoadingException e) {
+      exception = new CarbonDataWriterException(e.getMessage(), e);
+    }
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
         
.recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
             System.currentTimeMillis());
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
         .recordMdkGenerateTotalTime(CarbonTablePath.DEPRECATED_PATITION_ID,
             System.currentTimeMillis());
+    if (null != exception) {
+      throw exception;
+    }
   }
 
   private void processingComplete(CarbonFactHandler dataHandler) throws 
CarbonDataLoadingException {
@@ -310,4 +331,19 @@ public class CarbonRowDataWriterProcessorStepImpl extends 
AbstractDataLoadProces
       }
     }
   }
+
+  @Override public void close() {
+    if (!closed) {
+      super.close();
+      if (null != executorService) {
+        executorService.shutdownNow();
+      }
+      if (null != this.carbonFactHandlers && 
!this.carbonFactHandlers.isEmpty()) {
+        for (CarbonFactHandler carbonFactHandler : this.carbonFactHandlers) {
+          carbonFactHandler.finish();
+          carbonFactHandler.closeHandler();
+        }
+      }
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb01af16/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
index 5663811..26ae2d7 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterBatchProcessorStepImpl.java
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import org.apache.carbondata.common.logging.LogService;
 import org.apache.carbondata.common.logging.LogServiceFactory;
+import 
org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
 import org.apache.carbondata.core.datastore.row.CarbonRow;
 import 
org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator;
 import org.apache.carbondata.core.metadata.CarbonTableIdentifier;
@@ -98,8 +99,14 @@ public class DataWriterBatchProcessorStepImpl extends 
AbstractDataLoadProcessorS
                 .createCarbonFactHandler(model);
             carbonFactHandler.initialise();
             processBatch(next, carbonFactHandler);
-            finish(tableName, carbonFactHandler);
-            this.carbonFactHandler = null;
+            try {
+              finish(tableName, carbonFactHandler);
+            } finally {
+              // we need to make carbonFactHandler =null as finish will call 
closehandler
+              // even finish throws exception
+              // otherwise close() will call finish method again for same 
handler.
+              this.carbonFactHandler = null;
+            }
           }
         }
         i++;
@@ -119,19 +126,31 @@ public class DataWriterBatchProcessorStepImpl extends 
AbstractDataLoadProcessorS
   }
 
   private void finish(String tableName, CarbonFactHandler dataHandler) {
+    CarbonDataWriterException exception = null;
     try {
       dataHandler.finish();
     } catch (Exception e) {
+      // if throw exception from here dataHandler will not be closed.
+      // so just holding exception and later throwing exception
       LOGGER.error(e, "Failed for table: " + tableName + " in  finishing data 
handler");
+      exception = new CarbonDataWriterException(
+          "Failed for table: " + tableName + " in  finishing data handler", e);
     }
     
CarbonTimeStatisticsFactory.getLoadStatisticsInstance().recordTotalRecords(rowCounter.get());
-    processingComplete(dataHandler);
+    try {
+      processingComplete(dataHandler);
+    } catch (Exception e) {
+      exception = new CarbonDataWriterException(e.getMessage(), e);
+    }
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
         
.recordDictionaryValue2MdkAdd2FileTime(CarbonTablePath.DEPRECATED_PATITION_ID,
             System.currentTimeMillis());
     CarbonTimeStatisticsFactory.getLoadStatisticsInstance()
         .recordMdkGenerateTotalTime(CarbonTablePath.DEPRECATED_PATITION_ID,
             System.currentTimeMillis());
+    if (null != exception) {
+      throw exception;
+    }
   }
 
   private void processingComplete(CarbonFactHandler dataHandler) {

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb01af16/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
index 3e71e45..836e2c8 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/AbstractFactDataWriter.java
@@ -415,20 +415,30 @@ public abstract class AbstractFactDataWriter implements 
CarbonFactDataWriter {
    * @throws CarbonDataWriterException
    */
   protected void closeExecutorService() throws CarbonDataWriterException {
+    CarbonDataWriterException exception = null;
     try {
       listener.finish();
+      listener = null;
+    } catch (IOException e) {
+      exception = new CarbonDataWriterException(e);
+    }
+    try {
       executorService.shutdown();
       executorService.awaitTermination(2, TimeUnit.HOURS);
       for (int i = 0; i < executorServiceSubmitList.size(); i++) {
         executorServiceSubmitList.get(i).get();
       }
-      listener = null;
-    } catch (InterruptedException | ExecutionException | IOException e) {
-      throw new CarbonDataWriterException(e);
+    } catch (InterruptedException | ExecutionException e) {
+      if (null == exception) {
+        exception = new CarbonDataWriterException(e);
+      }
     }
     if (null != fallbackExecutorService) {
       fallbackExecutorService.shutdownNow();
     }
+    if (exception != null) {
+      throw exception;
+    }
   }
 
 

http://git-wip-us.apache.org/repos/asf/carbondata/blob/eb01af16/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
----------------------------------------------------------------------
diff --git 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
index dc6e443..f992e44 100644
--- 
a/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
+++ 
b/processing/src/main/java/org/apache/carbondata/processing/store/writer/v3/CarbonFactDataWriterImplV3.java
@@ -350,14 +350,25 @@ public class CarbonFactDataWriterImplV3 extends 
AbstractFactDataWriter {
    * @throws CarbonDataWriterException
    */
   public void closeWriter() throws CarbonDataWriterException {
-    commitCurrentFile(true);
+    CarbonDataWriterException exception = null;
     try {
+      commitCurrentFile(true);
       writeIndexFile();
-    } catch (IOException e) {
+    } catch (Exception e) {
       LOGGER.error(e, "Problem while writing the index file");
-      throw new CarbonDataWriterException("Problem while writing the index 
file", e);
+      exception = new CarbonDataWriterException("Problem while writing the 
index file", e);
+    } finally {
+      try {
+        closeExecutorService();
+      } catch (CarbonDataWriterException e) {
+        if (null == exception) {
+          exception = e;
+        }
+      }
+    }
+    if (null != exception) {
+      throw exception;
     }
-    closeExecutorService();
   }
 
   @Override public void writeFooterToFile() throws CarbonDataWriterException {

Reply via email to