This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b222c13f2f [Bug][connectors-v2] fix mongodb bson convert exception
(#8044)
b222c13f2f is described below
commit b222c13f2fcb53dd21fe63f375140e5d8563ab93
Author: zhangdonghao <[email protected]>
AuthorDate: Thu Nov 14 20:29:21 2024 +0800
[Bug][connectors-v2] fix mongodb bson convert exception (#8044)
---
.../MongoDBConnectorDeserializationSchema.java | 70 +++++++-
.../MongoDBConnectorDeserializationSchemaTest.java | 187 +++++++++++++++++++--
.../mongodb/serde/BsonToRowDataConverters.java | 35 +++-
.../mongodb/serde/BsonToRowDataConvertersTest.java | 93 +++++++++-
4 files changed, 363 insertions(+), 22 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
index edbeca4fab..823ed5d9ec 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/cdc/mongodb/sender/MongoDBConnectorDeserializationSchema.java
@@ -343,6 +343,15 @@ public class MongoDBConnectorDeserializationSchema
return convertToLocalDateTime(bsonValue).toLocalDate();
}
};
+ case TIME:
+ return new SerializableFunction<BsonValue, Object>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object apply(BsonValue bsonValue) {
+ return convertToLocalDateTime(bsonValue).toLocalTime();
+ }
+ };
case TIMESTAMP:
return new SerializableFunction<BsonValue, Object>() {
private static final long serialVersionUID = 1L;
@@ -382,7 +391,7 @@ public class MongoDBConnectorDeserializationSchema
private static LocalDateTime convertToLocalDateTime(BsonValue bsonValue) {
Instant instant;
if (bsonValue.isTimestamp()) {
- instant = Instant.ofEpochSecond(bsonValue.asTimestamp().getTime());
+ instant =
Instant.ofEpochSecond(bsonValue.asTimestamp().getValue());
} else if (bsonValue.isDateTime()) {
instant = Instant.ofEpochMilli(bsonValue.asDateTime().getValue());
} else {
@@ -521,7 +530,7 @@ public class MongoDBConnectorDeserializationSchema
}
private static double convertToDouble(@Nonnull BsonValue bsonValue) {
- if (bsonValue.isDouble()) {
+ if (bsonValue.isNumber()) {
return bsonValue.asNumber().doubleValue();
}
throw new MongodbConnectorException(
@@ -532,9 +541,20 @@ public class MongoDBConnectorDeserializationSchema
+ bsonValue.getBsonType());
}
- private static int convertToInt(@Nonnull BsonValue bsonValue) {
+ private static int convertToInt(BsonValue bsonValue) {
if (bsonValue.isInt32()) {
- return bsonValue.asNumber().intValue();
+ return bsonValue.asInt32().getValue();
+ } else if (bsonValue.isNumber()) {
+ long longValue = bsonValue.asNumber().longValue();
+ if (longValue > Integer.MAX_VALUE || longValue <
Integer.MIN_VALUE) {
+ throw new MongodbConnectorException(
+ UNSUPPORTED_DATA_TYPE,
+ "Unable to convert to integer from unexpected value '"
+ + bsonValue
+ + "' of type "
+ + bsonValue.getBsonType());
+ }
+ return (int) longValue;
}
throw new MongodbConnectorException(
UNSUPPORTED_DATA_TYPE,
@@ -568,8 +588,19 @@ public class MongoDBConnectorDeserializationSchema
"Unsupported BYTES value type: " +
bsonValue.getClass().getSimpleName());
}
- private static long convertToLong(@Nonnull BsonValue bsonValue) {
- if (bsonValue.isInt64()) {
+ private static long convertToLong(BsonValue bsonValue) {
+ if (bsonValue.isInt64() || bsonValue.isInt32()) {
+ return bsonValue.asNumber().longValue();
+ } else if (bsonValue.isDouble()) {
+ double value = bsonValue.asNumber().doubleValue();
+ if (value > Long.MAX_VALUE || value < Long.MIN_VALUE) {
+ throw new MongodbConnectorException(
+ UNSUPPORTED_DATA_TYPE,
+ "Unable to convert to long from unexpected value '"
+ + bsonValue
+ + "' of type "
+ + bsonValue.getBsonType());
+ }
return bsonValue.asNumber().longValue();
}
throw new MongodbConnectorException(
@@ -599,4 +630,31 @@ public class MongoDBConnectorDeserializationSchema
+ "' of type "
+ bsonValue.getBsonType());
}
+
+ @VisibleForTesting
+ public Object convertToObject(
+ @Nonnull SeaTunnelDataType<?> dataType, @Nonnull BsonValue
bsonValue) {
+ switch (dataType.getSqlType()) {
+ case INT:
+ return convertToInt(bsonValue);
+ case BIGINT:
+ return convertToLong(bsonValue);
+ case DOUBLE:
+ return convertToDouble(bsonValue);
+ case STRING:
+ return convertToString(bsonValue);
+ case DATE:
+ return convertToLocalDateTime(bsonValue).toLocalDate();
+ case TIME:
+ return convertToLocalDateTime(bsonValue).toLocalTime();
+ case TIMESTAMP:
+ return convertToLocalDateTime(bsonValue);
+ case DECIMAL:
+ DecimalType decimalType = (DecimalType) dataType;
+ BigDecimal decimalValue = convertToBigDecimal(bsonValue);
+ return fromBigDecimal(
+ decimalValue, decimalType.getPrecision(),
decimalType.getScale());
+ }
+ return null;
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/sender/MongoDBConnectorDeserializationSchemaTest.java
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/sender/MongoDBConnectorDeserializationSchemaTest.java
index 1b90490093..91c9fb47bc 100644
---
a/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/sender/MongoDBConnectorDeserializationSchemaTest.java
+++
b/seatunnel-connectors-v2/connector-cdc/connector-cdc-mongodb/src/test/java/mongodb/sender/MongoDBConnectorDeserializationSchemaTest.java
@@ -22,19 +22,37 @@ import
org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.exception.MongodbConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.sender.MongoDBConnectorDeserializationSchema;
import
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.MongodbRecordUtils;
import org.apache.kafka.connect.source.SourceRecord;
+import org.bson.BsonDateTime;
+import org.bson.BsonDecimal128;
import org.bson.BsonDocument;
+import org.bson.BsonDouble;
+import org.bson.BsonInt32;
import org.bson.BsonInt64;
+import org.bson.BsonObjectId;
import org.bson.BsonString;
+import org.bson.types.Decimal128;
+import org.bson.types.ObjectId;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.temporal.ChronoUnit;
import java.util.Collections;
import java.util.Map;
+import java.util.Objects;
+import java.util.stream.IntStream;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.COLL_FIELD;
import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.config.MongodbSourceOptions.DB_FIELD;
@@ -52,25 +70,65 @@ import static
org.apache.seatunnel.connectors.seatunnel.cdc.mongodb.utils.Mongod
public class MongoDBConnectorDeserializationSchemaTest {
- @Test
- public void extractTableId() {
- CatalogTable catalogTable =
+ private static TableSchema tableSchema;
+ private static CatalogTable catalogTable;
+
+ @BeforeAll
+ public static void setUp() {
+ tableSchema =
+ TableSchema.builder()
+ .column(PhysicalColumn.of("int", BasicType.INT_TYPE,
1L, true, null, ""))
+ .column(PhysicalColumn.of("long", BasicType.LONG_TYPE,
1L, true, null, ""))
+ .column(
+ PhysicalColumn.of(
+ "double", BasicType.DOUBLE_TYPE, 1L,
true, null, ""))
+ .column(
+ PhysicalColumn.of(
+ "decimal", new DecimalType(10, 2), 1L,
true, null, ""))
+ .column(
+ PhysicalColumn.of(
+ "string", BasicType.STRING_TYPE, 200L,
true, null, ""))
+ .column(
+ PhysicalColumn.of(
+ "date",
+ LocalTimeType.LOCAL_DATE_TYPE,
+ null,
+ null,
+ true,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "time",
+ LocalTimeType.LOCAL_TIME_TYPE,
+ null,
+ null,
+ true,
+ null,
+ null))
+ .column(
+ PhysicalColumn.of(
+ "timestamp",
+ LocalTimeType.LOCAL_DATE_TIME_TYPE,
+ null,
+ null,
+ true,
+ null,
+ null))
+ .build();
+ catalogTable =
CatalogTable.of(
TableIdentifier.of("catalog", "database", "table"),
- TableSchema.builder()
- .column(
- PhysicalColumn.of(
- "name1",
BasicType.STRING_TYPE, 1L, true, null, ""))
- .column(
- PhysicalColumn.of(
- "name1",
BasicType.STRING_TYPE, 1L, true, null, ""))
- .build(),
+ tableSchema,
Collections.emptyMap(),
Collections.emptyList(),
"comment");
+ }
+
+ @Test
+ public void extractTableId() {
MongoDBConnectorDeserializationSchema schema =
- new MongoDBConnectorDeserializationSchema(
- Collections.singletonList(catalogTable),
Collections.emptyMap());
+ new
MongoDBConnectorDeserializationSchema(Collections.singletonList(catalogTable));
// Build SourceRecord
Map<String, String> partitionMap =
@@ -106,4 +164,107 @@ public class MongoDBConnectorDeserializationSchemaTest {
Object tableId = schema.extractTableIdForTest(sourceRecord);
Assertions.assertEquals("inventory.products", tableId);
}
+
+ @Test
+ public void testBsonConvert() {
+ MongoDBConnectorDeserializationSchema schema =
+ new
MongoDBConnectorDeserializationSchema(Collections.singletonList(catalogTable));
+ // check int
+ Assertions.assertEquals(
+ 123456, schema.convertToObject(getDataType("int"), new
BsonInt32(123456)));
+ Assertions.assertEquals(
+ Integer.MAX_VALUE,
+ schema.convertToObject(getDataType("int"), new
BsonInt64(Integer.MAX_VALUE)));
+ Assertions.assertEquals(
+ 123456, schema.convertToObject(getDataType("int"), new
BsonDouble(123456)));
+ Assertions.assertThrowsExactly(
+ MongodbConnectorException.class,
+ () ->
+ schema.convertToObject(
+ getDataType("int"), new
BsonDouble(1234567890123456789.0d)));
+ Assertions.assertThrowsExactly(
+ MongodbConnectorException.class,
+ () -> schema.convertToObject(getDataType("int"), new
BsonInt64(Long.MIN_VALUE)));
+ // check long
+ Assertions.assertEquals(
+ 123456L, schema.convertToObject(getDataType("long"), new
BsonInt32(123456)));
+ Assertions.assertEquals(
+ (long) Integer.MAX_VALUE,
+ schema.convertToObject(getDataType("long"), new
BsonInt64(Integer.MAX_VALUE)));
+ Assertions.assertEquals(
+ 123456L, schema.convertToObject(getDataType("long"), new
BsonDouble(123456)));
+ Assertions.assertThrowsExactly(
+ MongodbConnectorException.class,
+ () ->
+ schema.convertToObject(
+ getDataType("long"),
+ new
BsonDouble(12345678901234567891234567890123456789.0d)));
+
+ // check double
+ Assertions.assertEquals(
+ 1.0d, schema.convertToObject(getDataType("double"), new
BsonInt32(1)));
+ Assertions.assertEquals(
+ 1.0d, schema.convertToObject(getDataType("double"), new
BsonInt64(1)));
+ Assertions.assertEquals(
+ 4.4d, schema.convertToObject(getDataType("double"), new
BsonDouble(4.4)));
+ // check decimal
+ Assertions.assertEquals(
+ new BigDecimal("3.14"),
+ schema.convertToObject(
+ getDataType("decimal"), new
BsonDecimal128(Decimal128.parse("3.1415926"))));
+ // check string
+ Assertions.assertEquals(
+ "123456", schema.convertToObject(getDataType("string"), new
BsonString("123456")));
+ Assertions.assertEquals(
+ "507f191e810c19729de860ea",
+ schema.convertToObject(
+ getDataType("string"),
+ new BsonObjectId(new
ObjectId("507f191e810c19729de860ea"))));
+ BsonDocument document =
+ new BsonDocument()
+ .append("key", new BsonString("123456"))
+ .append("value", new BsonInt64(123456789L));
+ Assertions.assertEquals(
+ "{\"key\": \"123456\", \"value\": 123456789}",
+ schema.convertToObject(getDataType("string"), document));
+
+ LocalDateTime now = LocalDateTime.now().truncatedTo(ChronoUnit.MILLIS);
+ long epochMilli =
now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
+ // check localDate
+ Assertions.assertEquals(
+ now.toLocalDate(),
+ schema.convertToObject(getDataType("date"), new
BsonDateTime(epochMilli)));
+ Assertions.assertEquals(
+ now.toLocalDate(),
+ schema.convertToObject(getDataType("date"), new
BsonDateTime(epochMilli)));
+ // check localTime
+ Assertions.assertEquals(
+ now.toLocalTime(),
+ schema.convertToObject(getDataType("time"), new
BsonDateTime(epochMilli)));
+ Assertions.assertEquals(
+ now.toLocalTime(),
+ schema.convertToObject(getDataType("time"), new
BsonDateTime(epochMilli)));
+ // check localDateTime
+ Assertions.assertEquals(
+ now,
+ schema.convertToObject(getDataType("timestamp"), new
BsonDateTime(epochMilli)));
+ Assertions.assertEquals(
+ now,
+ schema.convertToObject(getDataType("timestamp"), new
BsonDateTime(epochMilli)));
+ }
+
+ private SeaTunnelDataType<?> getDataType(String fieldName) {
+ String[] fieldNames = tableSchema.getFieldNames();
+ return IntStream.range(0, fieldNames.length)
+ .mapToObj(
+ i -> {
+ if (fieldName.equals(fieldNames[i])) {
+ return
tableSchema.getColumns().get(i).getDataType();
+ }
+ return null;
+ })
+ .filter(Objects::nonNull)
+ .findFirst()
+ .orElseThrow(() -> new RuntimeException("not found field"));
+ }
}
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java
index 8eda6612c7..4993a0db46 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/main/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConverters.java
@@ -178,6 +178,15 @@ public class BsonToRowDataConverters implements
Serializable {
return convertToLocalDateTime(bsonValue).toLocalDate();
}
};
+ case TIME:
+ return new SerializableFunction<BsonValue, Object>() {
+ private static final long serialVersionUID = 1L;
+
+ @Override
+ public Object apply(BsonValue bsonValue) {
+ return convertToLocalDateTime(bsonValue).toLocalTime();
+ }
+ };
case TIMESTAMP:
return new SerializableFunction<BsonValue, Object>() {
private static final long serialVersionUID = 1L;
@@ -217,7 +226,7 @@ public class BsonToRowDataConverters implements
Serializable {
private static LocalDateTime convertToLocalDateTime(BsonValue bsonValue) {
Instant instant;
if (bsonValue.isTimestamp()) {
- instant = Instant.ofEpochSecond(bsonValue.asTimestamp().getTime());
+ instant = Instant.ofEpochMilli(bsonValue.asTimestamp().getValue());
} else if (bsonValue.isDateTime()) {
instant = Instant.ofEpochMilli(bsonValue.asDateTime().getValue());
} else {
@@ -366,7 +375,18 @@ public class BsonToRowDataConverters implements
Serializable {
private static int convertToInt(BsonValue bsonValue) {
if (bsonValue.isInt32()) {
- return bsonValue.asNumber().intValue();
+ return bsonValue.asInt32().getValue();
+ } else if (bsonValue.isNumber()) {
+ long longValue = bsonValue.asNumber().longValue();
+ if (longValue > Integer.MAX_VALUE || longValue <
Integer.MIN_VALUE) {
+ throw new MongodbConnectorException(
+ UNSUPPORTED_DATA_TYPE,
+ "Unable to convert to integer from unexpected value '"
+ + bsonValue
+ + "' of type "
+ + bsonValue.getBsonType());
+ }
+ return (int) longValue;
}
throw new MongodbConnectorException(
UNSUPPORTED_DATA_TYPE,
@@ -403,6 +423,17 @@ public class BsonToRowDataConverters implements
Serializable {
private static long convertToLong(BsonValue bsonValue) {
if (bsonValue.isInt64() || bsonValue.isInt32()) {
return bsonValue.asNumber().longValue();
+ } else if (bsonValue.isDouble()) {
+ double value = bsonValue.asNumber().doubleValue();
+ if (value > Long.MAX_VALUE || value < Long.MIN_VALUE) {
+ throw new MongodbConnectorException(
+ UNSUPPORTED_DATA_TYPE,
+ "Unable to convert to long from unexpected value '"
+ + bsonValue
+ + "' of type "
+ + bsonValue.getBsonType());
+ }
+ return bsonValue.asNumber().longValue();
}
throw new MongodbConnectorException(
UNSUPPORTED_DATA_TYPE,
diff --git
a/seatunnel-connectors-v2/connector-mongodb/src/test/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConvertersTest.java
b/seatunnel-connectors-v2/connector-mongodb/src/test/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConvertersTest.java
index b47769c0ac..26c268a4e7 100644
---
a/seatunnel-connectors-v2/connector-mongodb/src/test/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConvertersTest.java
+++
b/seatunnel-connectors-v2/connector-mongodb/src/test/java/org/apache/seatunnel/connectors/seatunnel/mongodb/serde/BsonToRowDataConvertersTest.java
@@ -18,13 +18,29 @@
package org.apache.seatunnel.connectors.seatunnel.mongodb.serde;
import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.DecimalType;
+import org.apache.seatunnel.api.table.type.LocalTimeType;
+import
org.apache.seatunnel.connectors.seatunnel.mongodb.exception.MongodbConnectorException;
+import org.bson.BsonDateTime;
+import org.bson.BsonDecimal128;
+import org.bson.BsonDocument;
import org.bson.BsonDouble;
import org.bson.BsonInt32;
import org.bson.BsonInt64;
+import org.bson.BsonObjectId;
+import org.bson.BsonString;
+import org.bson.BsonTimestamp;
+import org.bson.types.Decimal128;
+import org.bson.types.ObjectId;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
+import java.math.BigDecimal;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.temporal.ChronoUnit;
+
public class BsonToRowDataConvertersTest {
private final BsonToRowDataConverters converterFactory = new
BsonToRowDataConverters();
@@ -42,7 +58,7 @@ public class BsonToRowDataConvertersTest {
}
@Test
- public void testConvertBsonIntToBigInt() {
+ public void testConvertBsonNumberToLong() {
// It covered #7567
BsonToRowDataConverters.BsonToRowDataConverter converter =
converterFactory.createConverter(BasicType.LONG_TYPE);
@@ -51,5 +67,80 @@ public class BsonToRowDataConvertersTest {
Assertions.assertEquals(
(long) Integer.MAX_VALUE, converter.convert(new
BsonInt64(Integer.MAX_VALUE)));
+
+ Assertions.assertEquals(123456L, converter.convert(new
BsonDouble(123456)));
+
+ Assertions.assertThrowsExactly(
+ MongodbConnectorException.class,
+ () -> converter.convert(new
BsonDouble(12345678901234567891234567890123456789.0d)));
+ }
+
+ @Test
+ public void testConvertBsonNumberToInt() {
+ // It covered #8042
+ BsonToRowDataConverters.BsonToRowDataConverter converter =
+ converterFactory.createConverter(BasicType.INT_TYPE);
+ Assertions.assertEquals(123456, converter.convert(new
BsonInt32(123456)));
+ Assertions.assertEquals(
+ Integer.MAX_VALUE, converter.convert(new
BsonInt64(Integer.MAX_VALUE)));
+ Assertions.assertEquals(123456, converter.convert(new
BsonDouble(123456)));
+ Assertions.assertThrowsExactly(
+ MongodbConnectorException.class,
+ () -> converter.convert(new
BsonDouble(1234567890123456789.0d)));
+ }
+
+ @Test
+ public void testConvertBsonDecimal128ToDecimal() {
+ BsonToRowDataConverters.BsonToRowDataConverter converter =
+ converterFactory.createConverter(new DecimalType(10, 2));
+ Assertions.assertEquals(
+ new BigDecimal("3.14"),
+ converter.convert(new
BsonDecimal128(Decimal128.parse("3.1415926"))));
+ }
+
+ @Test
+ public void testConvertBsonToString() {
+ BsonToRowDataConverters.BsonToRowDataConverter converter =
+ converterFactory.createConverter(BasicType.STRING_TYPE);
+ Assertions.assertEquals("123456", converter.convert(new
BsonString("123456")));
+
+ Assertions.assertEquals(
+ "507f191e810c19729de860ea",
+ converter.convert(new BsonObjectId(new
ObjectId("507f191e810c19729de860ea"))));
+
+ BsonDocument document =
+ new BsonDocument()
+ .append("key", new BsonString("123456"))
+ .append("value", new BsonInt64(123456789L));
+ Assertions.assertEquals(
+ "{\"key\": \"123456\", \"value\": 123456789}",
converter.convert(document));
+ }
+
+ @Test
+ public void testConvertBsonToLocalDateTime() {
+ LocalDateTime now = LocalDateTime.now().truncatedTo(ChronoUnit.MILLIS);
+ long epochMilli =
now.atZone(ZoneId.systemDefault()).toInstant().toEpochMilli();
+
+ // localDataTime converter
+ BsonToRowDataConverters.BsonToRowDataConverter localDataTimeConverter =
+
converterFactory.createConverter(LocalTimeType.LOCAL_DATE_TIME_TYPE);
+ Assertions.assertEquals(now, localDataTimeConverter.convert(new
BsonTimestamp(epochMilli)));
+ Assertions.assertEquals(now, localDataTimeConverter.convert(new
BsonDateTime(epochMilli)));
+
+ // localDate converter
+ BsonToRowDataConverters.BsonToRowDataConverter localDataConverter =
+
converterFactory.createConverter(LocalTimeType.LOCAL_DATE_TYPE);
+ Assertions.assertEquals(
+ now.toLocalDate(), localDataConverter.convert(new
BsonTimestamp(epochMilli)));
+ Assertions.assertEquals(
+ now.toLocalDate(), localDataConverter.convert(new
BsonDateTime(epochMilli)));
+
+ // localTime converter
+ BsonToRowDataConverters.BsonToRowDataConverter localTimeConverter =
+
converterFactory.createConverter(LocalTimeType.LOCAL_TIME_TYPE);
+ Assertions.assertEquals(
+ now.toLocalTime(), localTimeConverter.convert(new
BsonTimestamp(epochMilli)));
+ Assertions.assertEquals(
+ now.toLocalTime(), localTimeConverter.convert(new
BsonDateTime(epochMilli)));
}
}