Repository: carbondata Updated Branches: refs/heads/master 934216de1 -> 469c52f5d
[HOTFIX] Throw original exception in thread pool If there are exception occurs in the Callable.run in the thread pool, it should throw the original exception instead of throw a new one, which makes it hard for debugging. This closes #2887 Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/469c52f5 Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/469c52f5 Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/469c52f5 Branch: refs/heads/master Commit: 469c52f5d4c18579e2a6ed4c3bb35691cf01937b Parents: 934216d Author: Jacky Li <jacky.li...@qq.com> Authored: Wed Oct 31 20:16:11 2018 +0800 Committer: xuchuanyin <xuchuan...@hust.edu.cn> Committed: Mon Nov 5 09:15:53 2018 +0800 ---------------------------------------------------------------------- .../spark/rdd/NewCarbonDataLoadRDD.scala | 2 + .../carbondata/spark/rdd/PartitionDropper.scala | 7 +- .../spark/rdd/PartitionSplitter.scala | 4 +- .../steps/DataWriterProcessorStepImpl.java | 68 ++++++-------------- 4 files changed, 27 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/carbondata/blob/469c52f5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala index 041dc1c..0b6a2a9 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/NewCarbonDataLoadRDD.scala @@ -156,6 +156,8 @@ class NewCarbonDataLoadRDD[K, V]( logInfo("Bad Record Found") case e: Exception => loadMetadataDetails.setSegmentStatus(SegmentStatus.LOAD_FAILURE) + executionErrors.failureCauses = FailureCauses.EXECUTOR_FAILURE + executionErrors.errorMsg = e.getMessage logInfo("DataLoad failure", e) LOGGER.error(e) throw e http://git-wip-us.apache.org/repos/asf/carbondata/blob/469c52f5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala index 6911b0b..353a478 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionDropper.scala @@ -102,8 +102,8 @@ object PartitionDropper { Seq(partitionId, targetPartitionId).toList, dbName, tableName, partitionInfo) } catch { - case e: IOException => sys.error(s"Exception while delete original carbon files " + - e.getMessage) + case e: IOException => + throw new IOException("Exception while delete original carbon files ", e) } Audit.log(logger, s"Drop Partition request completed for table " + s"${ dbName }.${ tableName }") @@ -111,7 +111,8 @@ object PartitionDropper { s"${ dbName }.${ tableName }") } } catch { - case e: Exception => sys.error(s"Exception in dropping partition action: ${ e.getMessage }") + case e: Exception => + throw new RuntimeException("Exception in dropping partition action", e) } } else { PartitionUtils.deleteOriginalCarbonFile(alterPartitionModel, absoluteTableIdentifier, http://git-wip-us.apache.org/repos/asf/carbondata/blob/469c52f5/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala index ca9f049..369ad51 100644 --- a/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala +++ b/integration/spark-common/src/main/scala/org/apache/carbondata/spark/rdd/PartitionSplitter.scala @@ -87,8 +87,8 @@ object PartitionSplitter { deleteOriginalCarbonFile(alterPartitionModel, absoluteTableIdentifier, Seq(partitionId).toList, databaseName, tableName, partitionInfo) } catch { - case e: IOException => sys.error(s"Exception while delete original carbon files " + - e.getMessage) + case e: IOException => + throw new IOException("Exception while delete original carbon files ", e) } Audit.log(logger, s"Add/Split Partition request completed for table " + s"${ databaseName }.${ tableName }") http://git-wip-us.apache.org/repos/asf/carbondata/blob/469c52f5/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java index 1657476..2dc3275 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/loading/steps/DataWriterProcessorStepImpl.java @@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException; import org.apache.carbondata.core.datastore.row.CarbonRow; -import org.apache.carbondata.core.keygenerator.KeyGenException; import org.apache.carbondata.core.localdictionary.generator.LocalDictionaryGenerator; import org.apache.carbondata.core.metadata.CarbonTableIdentifier; import org.apache.carbondata.core.util.CarbonThreadFactory; @@ -122,7 +121,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { // do this concurrently for (Iterator<CarbonRowBatch> iterator : iterators) { rangeExecutorServiceSubmitList.add( - rangeExecutorService.submit(new WriterForwarder(iterator, tableIdentifier, i))); + rangeExecutorService.submit(new WriterForwarder(iterator, i))); i++; } try { @@ -131,15 +130,14 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { for (int j = 0; j < rangeExecutorServiceSubmitList.size(); j++) { rangeExecutorServiceSubmitList.get(j).get(); } - } catch (InterruptedException | ExecutionException e) { + } catch (InterruptedException e) { throw new CarbonDataWriterException(e); + } catch (ExecutionException e) { + throw new CarbonDataWriterException(e.getCause()); } } catch (CarbonDataWriterException e) { - LOGGER.error("Failed for table: " + tableName + " in DataWriterProcessorStepImpl", e); - throw new CarbonDataLoadingException( - "Error while initializing data handler : " + e.getMessage()); + throw new CarbonDataLoadingException("Error while initializing writer: " + e.getMessage(), e); } catch (Exception e) { - LOGGER.error("Failed for table: " + tableName + " in DataWriterProcessorStepImpl", e); throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage(), e); } return null; @@ -154,19 +152,14 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { */ private final class WriterForwarder implements Callable<Void> { private Iterator<CarbonRowBatch> insideRangeIterator; - private CarbonTableIdentifier tableIdentifier; private int rangeId; - public WriterForwarder(Iterator<CarbonRowBatch> insideRangeIterator, - CarbonTableIdentifier tableIdentifier, int rangeId) { + WriterForwarder(Iterator<CarbonRowBatch> insideRangeIterator, int rangeId) { this.insideRangeIterator = insideRangeIterator; - this.tableIdentifier = tableIdentifier; this.rangeId = rangeId; } - @Override public Void call() throws Exception { - LOGGER.info("Process writer forward for table " + tableIdentifier.getTableName() - + ", range: " + rangeId); + @Override public Void call() { processRange(insideRangeIterator, rangeId); return null; } @@ -184,8 +177,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { while (insideRangeIterator.hasNext()) { if (rowsNotExist) { rowsNotExist = false; - dataHandler = CarbonFactHandlerFactory - .createCarbonFactHandler(model); + dataHandler = CarbonFactHandlerFactory.createCarbonFactHandler(model); carbonFactHandlers.add(dataHandler); dataHandler.initialise(); } @@ -201,12 +193,7 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { CarbonTableIdentifier tableIdentifier = configuration.getTableIdentifier().getCarbonTableIdentifier(); String tableName = tableIdentifier.getTableName(); - - try { - dataHandler.finish(); - } catch (Exception e) { - LOGGER.error("Failed for table: " + tableName + " in finishing data handler", e); - } + dataHandler.finish(); LOGGER.info("Record Processed For table: " + tableName); String logMessage = "Finished Carbon DataWriterProcessorStepImpl: Read: " + readCounter + ": Write: " @@ -222,41 +209,24 @@ public class DataWriterProcessorStepImpl extends AbstractDataLoadProcessorStep { System.currentTimeMillis()); } - private void processingComplete(CarbonFactHandler dataHandler) throws CarbonDataLoadingException { + private void processingComplete(CarbonFactHandler dataHandler) { if (null != dataHandler) { - try { - dataHandler.closeHandler(); - } catch (CarbonDataWriterException e) { - LOGGER.error(e.getMessage(), e); - throw new CarbonDataLoadingException(e.getMessage(), e); - } catch (Exception e) { - LOGGER.error(e.getMessage(), e); - throw new CarbonDataLoadingException("There is an unexpected error: " + e.getMessage()); - } + dataHandler.closeHandler(); } } - private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler) - throws CarbonDataLoadingException { - try { - while (batch.hasNext()) { - CarbonRow row = batch.next(); - dataHandler.addDataToStore(row); - readCounter++; - } - } catch (Exception e) { - throw new CarbonDataLoadingException(e); + private void processBatch(CarbonRowBatch batch, CarbonFactHandler dataHandler) { + while (batch.hasNext()) { + CarbonRow row = batch.next(); + dataHandler.addDataToStore(row); + readCounter++; } rowCounter.getAndAdd(batch.getSize()); } - public void processRow(CarbonRow row, CarbonFactHandler dataHandler) throws KeyGenException { - try { - readCounter++; - dataHandler.addDataToStore(row); - } catch (Exception e) { - throw new CarbonDataLoadingException(e); - } + public void processRow(CarbonRow row, CarbonFactHandler dataHandler) { + readCounter++; + dataHandler.addDataToStore(row); rowCounter.getAndAdd(1); }