This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new 618d4e25c8d [fix][sql] Fix the decimal type error convert in json
schema (#15687)
618d4e25c8d is described below
commit 618d4e25c8d60859135a82cc6778b25fccbf998f
Author: Baodi Shi <[email protected]>
AuthorDate: Thu May 26 08:24:14 2022 +0800
[fix][sql] Fix the decimal type error convert in json schema (#15687)
### Motivation
In the current sql implementation, If using `JSON` schema and querying for
decimal type, there will be the following two errors:
1. The data type is displayed as varchar.
2. Loss of precision because scientific notation is used to display.
```
presto> select bigdecimal, typeof(bigdecimal) as devimal_type from
pulsar."public/default".test_avro2;
bigdecimal | devimal_type
-----------------------+--------------
1.2345678912345678E36 | varchar
1.2345678912345678E36 | varchar
(2 rows)
```
The original data is: `1234567891234567891234567891234567.89`
### Modifications
- When getting jsonNode, use `BIG_DECIMAL` instead of float and double.
- `PulsarJsonFieldDecoder` increases the processing of Decimal types
(cherry picked from commit 0c6e2ca24fd368d11a769233bb2041f6cc4a8374)
---
.../impl/schema/generic/GenericJsonReader.java | 16 ++++++++--------
.../decoder/json/PulsarJsonFieldDecoder.java | 22 ++++++++++++++++++++++
.../decoder/json/PulsarJsonRowDecoderFactory.java | 11 +++++++----
.../sql/presto/decoder/avro/TestAvroDecoder.java | 13 +++++++++++--
.../sql/presto/decoder/json/TestJsonDecoder.java | 20 ++++++++++++++++++++
5 files changed, 68 insertions(+), 14 deletions(-)
diff --git
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
index 3284d427a29..1a95e9be152 100644
---
a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
+++
b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/schema/generic/GenericJsonReader.java
@@ -19,8 +19,10 @@
package org.apache.pulsar.client.impl.schema.generic;
import static java.nio.charset.StandardCharsets.UTF_8;
+import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
import java.io.IOException;
import java.io.InputStream;
import java.util.List;
@@ -34,16 +36,13 @@ import org.slf4j.LoggerFactory;
public class GenericJsonReader implements SchemaReader<GenericRecord> {
- private final ObjectMapper objectMapper;
+ private final ObjectReader objectReader;
private final byte[] schemaVersion;
private final List<Field> fields;
private SchemaInfo schemaInfo;
public GenericJsonReader(List<Field> fields, SchemaInfo schemaInfo){
- this.fields = fields;
- this.schemaVersion = null;
- this.objectMapper = new ObjectMapper();
- this.schemaInfo = schemaInfo;
+ this(null, fields, schemaInfo);
}
public GenericJsonReader(List<Field> fields){
@@ -55,16 +54,17 @@ public class GenericJsonReader implements
SchemaReader<GenericRecord> {
}
public GenericJsonReader(byte[] schemaVersion, List<Field> fields,
SchemaInfo schemaInfo){
- this.objectMapper = new ObjectMapper();
this.fields = fields;
this.schemaVersion = schemaVersion;
this.schemaInfo = schemaInfo;
+ ObjectMapper objectMapper = new ObjectMapper();
+ this.objectReader =
objectMapper.reader().with(DeserializationFeature.USE_BIG_DECIMAL_FOR_FLOATS);
}
@Override
public GenericJsonRecord read(byte[] bytes, int offset, int length) {
try {
- JsonNode jn = objectMapper.readTree(new String(bytes, offset,
length, UTF_8));
+ JsonNode jn = objectReader.readTree(new String(bytes, offset,
length, UTF_8));
return new GenericJsonRecord(schemaVersion, fields, jn,
schemaInfo);
} catch (IOException ioe) {
throw new SchemaSerializationException(ioe);
@@ -74,7 +74,7 @@ public class GenericJsonReader implements
SchemaReader<GenericRecord> {
@Override
public GenericRecord read(InputStream inputStream) {
try {
- JsonNode jn = objectMapper.readTree(inputStream);
+ JsonNode jn = objectReader.readTree(inputStream);
return new GenericJsonRecord(schemaVersion, fields, jn,
schemaInfo);
} catch (IOException ioe) {
throw new SchemaSerializationException(ioe);
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java
index 5c9dc0c5459..fcd2550f9ba 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonFieldDecoder.java
@@ -57,6 +57,8 @@ import io.prestosql.spi.type.ArrayType;
import io.prestosql.spi.type.BigintType;
import io.prestosql.spi.type.BooleanType;
import io.prestosql.spi.type.DateType;
+import io.prestosql.spi.type.DecimalType;
+import io.prestosql.spi.type.Decimals;
import io.prestosql.spi.type.DoubleType;
import io.prestosql.spi.type.IntegerType;
import io.prestosql.spi.type.MapType;
@@ -69,6 +71,7 @@ import io.prestosql.spi.type.TinyintType;
import io.prestosql.spi.type.Type;
import io.prestosql.spi.type.VarbinaryType;
import io.prestosql.spi.type.VarcharType;
+import java.math.BigInteger;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -118,6 +121,9 @@ public class PulsarJsonFieldDecoder
}
private boolean isSupportedType(Type type) {
+ if (type instanceof DecimalType) {
+ return true;
+ }
if (isVarcharType(type)) {
return true;
}
@@ -226,6 +232,13 @@ public class PulsarJsonFieldDecoder
return floatToIntBits((Float) parseFloat(value.asText()));
}
+ // If it is decimalType, need to eliminate the decimal point,
+ // and give it to presto to set the decimal point
+ if (type instanceof DecimalType) {
+ String decimalLong = value.asText().replace(".", "");
+ return Long.valueOf(decimalLong);
+ }
+
long longValue;
if (value.isIntegralNumber() && !value.isBigInteger()) {
longValue = value.longValue();
@@ -265,6 +278,15 @@ public class PulsarJsonFieldDecoder
private static Slice getSlice(JsonNode value, Type type, String
columnName) {
String textValue = value.isValueNode() ? value.asText() :
value.toString();
+
+ // If it is decimalType, need to eliminate the decimal point,
+ // and give it to presto to set the decimal point
+ if (type instanceof DecimalType) {
+ textValue = textValue.replace(".", "");
+ BigInteger bigInteger = new BigInteger(textValue);
+ return Decimals.encodeUnscaledValue(bigInteger);
+ }
+
Slice slice = utf8Slice(textValue);
if (isVarcharType(type)) {
slice = truncateToLength(slice, type);
diff --git
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
index bb064d8909f..90a4fdb18af 100644
---
a/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
+++
b/pulsar-sql/presto-pulsar/src/main/java/org/apache/pulsar/sql/presto/decoder/json/PulsarJsonRowDecoderFactory.java
@@ -33,6 +33,7 @@ import io.prestosql.spi.connector.ColumnMetadata;
import io.prestosql.spi.type.ArrayType;
import io.prestosql.spi.type.BigintType;
import io.prestosql.spi.type.BooleanType;
+import io.prestosql.spi.type.DecimalType;
import io.prestosql.spi.type.DoubleType;
import io.prestosql.spi.type.IntegerType;
import io.prestosql.spi.type.RealType;
@@ -128,11 +129,13 @@ public class PulsarJsonRowDecoderFactory implements
PulsarRowDecoderFactory {
+ "please check the schema or report the
bug.", fieldname));
case FIXED:
case BYTES:
- // In the current implementation, since JsonSchema is
generated by Avro,
- // there may exist LogicalTypes.Decimal.
- // Mapping decimalType with varcharType in JsonSchema.
+ // When the precision <= 0, throw Exception.
+ // When the precision > 0 and <= 18, use ShortDecimalType.
and mapping Long
+ // When the precision > 18 and <= 36, use LongDecimalType.
and mapping Slice
+ // When the precision > 36, throw Exception.
if (logicalType instanceof LogicalTypes.Decimal) {
- return createUnboundedVarcharType();
+ LogicalTypes.Decimal decimal = (LogicalTypes.Decimal)
logicalType;
+ return
DecimalType.createDecimalType(decimal.getPrecision(), decimal.getScale());
}
return VarbinaryType.VARBINARY;
case INT:
diff --git
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
index 7b270c7995b..2478300dcaa 100644
---
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
+++
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/avro/TestAvroDecoder.java
@@ -89,8 +89,6 @@ public class TestAvroDecoder extends AbstractDecoderTester {
message.longField = 222L;
message.timestampField = System.currentTimeMillis();
message.enumField = DecoderTestMessage.TestEnum.TEST_ENUM_1;
- message.decimalField = BigDecimal.valueOf(2233, 2);
- message.longDecimalField = new
BigDecimal("1234567891234567891234567891.23");
LocalTime now = LocalTime.now(ZoneId.systemDefault());
message.timeField = now.toSecondOfDay() * 1000;
@@ -130,6 +128,17 @@ public class TestAvroDecoder extends AbstractDecoderTester
{
PulsarColumnHandle enumFieldColumnHandle = new
PulsarColumnHandle(getPulsarConnectorId().toString(),
"enumField", VARCHAR, false, false, "enumField", null, null,
PulsarColumnHandle.HandleKeyValueType.NONE);
checkValue(decodedRow, enumFieldColumnHandle,
message.enumField.toString());
+ }
+
+ @Test
+ public void testDecimal() {
+ DecoderTestMessage message = new DecoderTestMessage();
+ message.decimalField = BigDecimal.valueOf(2233, 2);
+ message.longDecimalField = new
BigDecimal("1234567891234567891234567891.23");
+
+ ByteBuf payload = io.netty.buffer.Unpooled
+ .copiedBuffer(schema.encode(message));
+ Map<DecoderColumnHandle, FieldValueProvider> decodedRow =
pulsarRowDecoder.decodeRow(payload).get();
PulsarColumnHandle decimalFieldColumnHandle = new
PulsarColumnHandle(getPulsarConnectorId().toString(),
"decimalField", DecimalType.createDecimalType(4, 2), false,
false, "decimalField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
diff --git
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
index 2a22b58a03f..0b8a8f84eda 100644
---
a/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
+++
b/pulsar-sql/presto-pulsar/src/test/java/org/apache/pulsar/sql/presto/decoder/json/TestJsonDecoder.java
@@ -24,6 +24,7 @@ import io.prestosql.decoder.DecoderColumnHandle;
import io.prestosql.decoder.FieldValueProvider;
import io.prestosql.spi.PrestoException;
import io.prestosql.spi.type.*;
+import java.math.BigDecimal;
import org.apache.pulsar.client.impl.schema.JSONSchema;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonRecord;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
@@ -119,6 +120,25 @@ public class TestJsonDecoder extends AbstractDecoderTester
{
}
+ @Test
+ public void testDecimal() {
+ DecoderTestMessage message = new DecoderTestMessage();
+ message.decimalField = BigDecimal.valueOf(2233, 2);
+ message.longDecimalField = new
BigDecimal("1234567891234567891234567891.23");
+
+ ByteBuf payload = io.netty.buffer.Unpooled
+ .copiedBuffer(schema.encode(message));
+ Map<DecoderColumnHandle, FieldValueProvider> decodedRow =
pulsarRowDecoder.decodeRow(payload).get();
+
+ PulsarColumnHandle decimalFieldColumnHandle = new
PulsarColumnHandle(getPulsarConnectorId().toString(),
+ "decimalField", DecimalType.createDecimalType(4, 2), false,
false, "decimalField", null, null, PulsarColumnHandle.HandleKeyValueType.NONE);
+ checkValue(decodedRow, decimalFieldColumnHandle, message.decimalField);
+
+ PulsarColumnHandle longDecimalFieldColumnHandle = new
PulsarColumnHandle(getPulsarConnectorId().toString(),
+ "longDecimalField", DecimalType.createDecimalType(30, 2),
false, false, "longDecimalField", null, null,
PulsarColumnHandle.HandleKeyValueType.NONE);
+ checkValue(decodedRow, longDecimalFieldColumnHandle,
message.longDecimalField);
+ }
+
@Test
public void testArray() {
DecoderTestMessage message = new DecoderTestMessage();