[CARBONDATA-814] bad record log file writing is not correct
Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/3af2d650 Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/3af2d650 Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/3af2d650 Branch: refs/heads/12-dev Commit: 3af2d650372b8e0e85b03133229c1b7c15e4dafc Parents: 05b790a Author: mohammadshahidkhan <mohdshahidkhan1...@gmail.com> Authored: Fri Mar 24 10:33:02 2017 +0530 Committer: ravipesala <ravi.pes...@gmail.com> Committed: Thu Apr 6 14:56:37 2017 +0530 ---------------------------------------------------------------------- .../core/scan/expression/RangeExpressionEvaluator.java | 7 +++++++ .../newflow/steps/DataConverterProcessorStepImpl.java | 8 ++++++++ .../processing/store/CarbonFactDataHandlerColumnar.java | 5 ----- 3 files changed, 15 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3af2d650/core/src/main/java/org/apache/carbondata/core/scan/expression/RangeExpressionEvaluator.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/carbondata/core/scan/expression/RangeExpressionEvaluator.java b/core/src/main/java/org/apache/carbondata/core/scan/expression/RangeExpressionEvaluator.java index 7cabf10..d857fd8 100644 --- a/core/src/main/java/org/apache/carbondata/core/scan/expression/RangeExpressionEvaluator.java +++ b/core/src/main/java/org/apache/carbondata/core/scan/expression/RangeExpressionEvaluator.java @@ -22,6 +22,8 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import org.apache.carbondata.common.logging.LogService; +import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.metadata.datatype.DataType; import org.apache.carbondata.core.scan.expression.conditional.GreaterThanEqualToExpression; @@ -41,6 +43,8 @@ import static org.apache.carbondata.core.scan.filter.intf.ExpressionType.LESSTHA import static org.apache.carbondata.core.scan.filter.intf.ExpressionType.LESSTHAN_EQUALTO; public class RangeExpressionEvaluator { + private static final LogService LOG = + LogServiceFactory.getLogService(RangeExpressionEvaluator.class.getName()); private Expression expr; private Expression srcNode; private Expression srcParentNode; @@ -160,6 +164,9 @@ public class RangeExpressionEvaluator { } if ((null != startMin) && (null != endMax)) { + LOG.info( + "GreaterThan and LessThan Filter Expression changed to Range Expression for column " + + colName); // the node can be converted to RANGE. Expression n1 = startMin.getCurrentExp(); Expression n2 = endMax.getCurrentExp(); http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3af2d650/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java index cb6baf4..cc99469 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/steps/DataConverterProcessorStepImpl.java @@ -37,6 +37,7 @@ import org.apache.carbondata.processing.newflow.converter.impl.RowConverterImpl; import org.apache.carbondata.processing.newflow.row.CarbonRow; import org.apache.carbondata.processing.newflow.row.CarbonRowBatch; import org.apache.carbondata.processing.surrogatekeysgenerator.csvbased.BadRecordsLogger; +import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; /** * Replace row data fields with dictionary values if column is configured dictionary encoded. @@ -171,6 +172,13 @@ public class DataConverterProcessorStepImpl extends AbstractDataLoadProcessorSte if (!closed) { if (null != badRecordLogger) { badRecordLogger.closeStreams(); + // rename the bad record in progress to normal + CarbonTableIdentifier identifier = + configuration.getTableIdentifier().getCarbonTableIdentifier(); + CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal( + identifier.getDatabaseName() + File.separator + identifier.getTableName() + + File.separator + configuration.getSegmentId() + File.separator + configuration + .getTaskNo()); } super.close(); if (converters != null) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/3af2d650/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index da75428..0e6a49d 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -76,7 +76,6 @@ import org.apache.carbondata.processing.store.file.IFileManagerComposite; import org.apache.carbondata.processing.store.writer.CarbonDataWriterVo; import org.apache.carbondata.processing.store.writer.CarbonFactDataWriter; import org.apache.carbondata.processing.store.writer.exception.CarbonDataWriterException; -import org.apache.carbondata.processing.util.CarbonDataProcessorUtil; import org.apache.carbondata.processing.util.NonDictionaryUtil; import org.apache.spark.sql.types.Decimal; @@ -952,10 +951,6 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { LOGGER.info("All blocklets have been finished writing"); // close all the open stream for both the files this.dataWriter.closeWriter(); - // rename the bad record in progress to normal - CarbonDataProcessorUtil.renameBadRecordsFromInProgressToNormal( - this.databaseName + File.separator + this.tableName + File.separator + this.segmentId - + File.separator + this.carbonDataFileAttributes.getTaskId()); } this.dataWriter = null; this.keyBlockHolder = null;