Repository: incubator-carbondata Updated Branches: refs/heads/master a6c8d2a79 -> 716517619
Fix load fail with unsafe enabled and with bigdecimal datatypes Project: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/commit/aca59cea Tree: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/tree/aca59cea Diff: http://git-wip-us.apache.org/repos/asf/incubator-carbondata/diff/aca59cea Branch: refs/heads/master Commit: aca59cea38df7a5509d1f4c96817c2ef7cd7dd9a Parents: a6c8d2a Author: ravipesala <ravi.pes...@gmail.com> Authored: Fri Feb 3 19:35:13 2017 +0530 Committer: jackylk <jacky.li...@huawei.com> Committed: Tue Feb 21 09:06:12 2017 +0800 ---------------------------------------------------------------------- .../bigdecimal/TestDimensionWithDecimalDataType.scala | 14 ++++++++++---- .../newflow/sort/unsafe/UnsafeCarbonRowPage.java | 3 +-- .../unsafe/holder/UnsafeSortTempFileChunkHolder.java | 3 +-- .../unsafe/merger/UnsafeIntermediateFileMerger.java | 5 +---- .../store/CarbonFactDataHandlerColumnar.java | 1 + 5 files changed, 14 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/aca59cea/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/bigdecimal/TestDimensionWithDecimalDataType.scala ---------------------------------------------------------------------- diff --git a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/bigdecimal/TestDimensionWithDecimalDataType.scala b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/bigdecimal/TestDimensionWithDecimalDataType.scala index 743b39b..e267763 100644 --- a/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/bigdecimal/TestDimensionWithDecimalDataType.scala +++ b/integration/spark-common-test/src/test/scala/org/apache/carbondata/integration/spark/testsuite/bigdecimal/TestDimensionWithDecimalDataType.scala @@ -39,10 +39,16 @@ class TestDimensionWithDecimalDataType extends QueryTest with BeforeAndAfterAll sql(s"LOAD DATA local inpath '$resourcesPath/decimalDataWithoutHeader.csv' INTO table hiveTable") } -// test("test detail query on dimension column with decimal data type") { -// checkAnswer(sql("select salary from carbonTable order by salary"), -// sql("select salary from hiveTable order by salary")) -// } + test("test unsafe with bigdecimal") { + sql("drop table if exists unsafecarbonTable") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "true") + sql("CREATE TABLE IF NOT EXISTS unsafecarbonTable (ID Int, date Timestamp, country String, name String, phonetype String, serialname String, salary Decimal(19,2))STORED BY 'org.apache.carbondata.format'") + sql(s"LOAD DATA LOCAL INPATH '$resourcesPath/decimalDataWithHeader.csv' into table unsafecarbonTable") + CarbonProperties.getInstance() + .addProperty(CarbonCommonConstants.ENABLE_UNSAFE_SORT, "false") + sql("drop table if exists unsafecarbonTable") + } test("test aggregate query on dimension column with decimal data type") { checkAnswer(sql("select sum(salary) from carbonTable"), http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/aca59cea/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java index 561ad38..60af5d8 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/UnsafeCarbonRowPage.java @@ -196,9 +196,8 @@ public class UnsafeCarbonRowPage { size += 2; CarbonUnsafe.unsafe.copyMemory(baseObject, address + size, bigDecimalInBytes, CarbonUnsafe.BYTE_ARRAY_OFFSET, bigDecimalInBytes.length); - BigDecimal val = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes); size += bigDecimalInBytes.length; - rowToFill[dimensionSize + mesCount] = val; + rowToFill[dimensionSize + mesCount] = bigDecimalInBytes; } } else { rowToFill[dimensionSize + mesCount] = null; http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/aca59cea/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java index 0c89821..60f259e 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/holder/UnsafeSortTempFileChunkHolder.java @@ -34,7 +34,6 @@ import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonProperties; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage; import org.apache.carbondata.processing.sortandgroupby.exception.CarbonSortKeyAndGroupByException; import org.apache.carbondata.processing.sortandgroupby.sortdata.NewRowComparator; @@ -333,7 +332,7 @@ public class UnsafeSortTempFileChunkHolder implements SortTempChunkHolder { short aShort = stream.readShort(); byte[] bigDecimalInBytes = new byte[aShort]; stream.readFully(bigDecimalInBytes); - row[dimensionCount + mesCount] = DataTypeUtil.byteToBigDecimal(bigDecimalInBytes); + row[dimensionCount + mesCount] = bigDecimalInBytes; } } } http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/aca59cea/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java index c6b5e9a..ed8023b 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java +++ b/processing/src/main/java/org/apache/carbondata/processing/newflow/sort/unsafe/merger/UnsafeIntermediateFileMerger.java @@ -23,7 +23,6 @@ import java.io.File; import java.io.FileNotFoundException; import java.io.FileOutputStream; import java.io.IOException; -import java.math.BigDecimal; import java.nio.ByteBuffer; import java.util.AbstractQueue; import java.util.Arrays; @@ -34,7 +33,6 @@ import org.apache.carbondata.common.logging.LogService; import org.apache.carbondata.common.logging.LogServiceFactory; import org.apache.carbondata.core.constants.CarbonCommonConstants; import org.apache.carbondata.core.util.CarbonUtil; -import org.apache.carbondata.core.util.DataTypeUtil; import org.apache.carbondata.processing.newflow.sort.unsafe.UnsafeCarbonRowPage; import org.apache.carbondata.processing.newflow.sort.unsafe.holder.SortTempChunkHolder; import org.apache.carbondata.processing.newflow.sort.unsafe.holder.UnsafeSortTempFileChunkHolder; @@ -320,8 +318,7 @@ public class UnsafeIntermediateFileMerger implements Callable<Void> { rowData.putLong(size, val); size += 8; } else if (aggType[mesCount] == CarbonCommonConstants.BIG_DECIMAL_MEASURE) { - BigDecimal val = (BigDecimal) value; - byte[] bigDecimalInBytes = DataTypeUtil.bigDecimalToByte(val); + byte[] bigDecimalInBytes = (byte[]) value; rowData.putShort(size, (short)bigDecimalInBytes.length); size += 2; for (int i = 0; i < bigDecimalInBytes.length; i++) { http://git-wip-us.apache.org/repos/asf/incubator-carbondata/blob/aca59cea/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java ---------------------------------------------------------------------- diff --git a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java index 7c48cec..bf66700 100644 --- a/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java +++ b/processing/src/main/java/org/apache/carbondata/processing/store/CarbonFactDataHandlerColumnar.java @@ -1540,6 +1540,7 @@ public class CarbonFactDataHandlerColumnar implements CarbonFactHandler { blockletDataHolder.put(nodeHolder, indexInNodeHolderArray); return null; } catch (Throwable throwable) { + LOGGER.error(throwable, "Error in producer"); consumerExecutorService.shutdownNow(); resetBlockletProcessingCount(); throw new CarbonDataWriterException(throwable.getMessage(), throwable);