This is an automated email from the ASF dual-hosted git repository. diwu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git
The following commit(s) were added to refs/heads/master by this push: new 2b595a78 [Fix](source) Fixed incorrect Map value reading when key/value is of DATE or DATETIME type (#419) 2b595a78 is described below commit 2b595a7819378650052a5beac4c0b707876b23ab Author: bingquanzhao <bingquan_z...@icloud.com> AuthorDate: Mon Jul 15 10:16:33 2024 +0800 [Fix](source) Fixed incorrect Map value reading when key/value is of DATE or DATETIME type (#419) --- .../apache/doris/flink/serialization/RowBatch.java | 37 ++++++++++++++++++---- .../doris/flink/serialization/TestRowBatch.java | 29 +++++++++++++++++ 2 files changed, 59 insertions(+), 7 deletions(-) diff --git a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java index c7afe7f5..1a42b2b9 100644 --- a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java +++ b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java @@ -40,7 +40,10 @@ import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.MapVector; import org.apache.arrow.vector.complex.StructVector; +import org.apache.arrow.vector.complex.impl.DateDayReaderImpl; +import org.apache.arrow.vector.complex.impl.TimeStampMicroReaderImpl; import org.apache.arrow.vector.complex.impl.UnionMapReader; +import org.apache.arrow.vector.complex.reader.FieldReader; import org.apache.arrow.vector.ipc.ArrowStreamReader; import org.apache.arrow.vector.types.Types; import org.apache.doris.flink.exception.DorisException; @@ -105,6 +108,7 @@ public class RowBatch { private final DateTimeFormatter dateTimeV2Formatter = DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN); private final DateTimeFormatter dateFormatter = DateTimeFormatter.ofPattern("yyyy-MM-dd"); + private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault(); public List<Row> getRowBatch() { return rowBatch; @@ -454,7 +458,11 @@ public class RowBatch { reader.setPosition(rowIndex); Map<String, Object> mapValue = new HashMap<>(); while (reader.next()) { - mapValue.put(reader.key().readObject().toString(), reader.value().readObject()); + FieldReader keyReader = reader.key(); + FieldReader valueReader = reader.value(); + Object mapKeyObj = handleMapFieldReader(keyReader); + Object mapValueObj = handleMapFieldReader(valueReader); + mapValue.put(mapKeyObj.toString(), mapValueObj); } addValueToRow(rowIndex, mapValue); break; @@ -478,6 +486,16 @@ public class RowBatch { return true; } + private Object handleMapFieldReader(FieldReader reader) { + if (reader instanceof TimeStampMicroReaderImpl) { + return longToLocalDateTime(reader.readLong()); + } + if (reader instanceof DateDayReaderImpl) { + return LocalDate.ofEpochDay(((Integer) reader.readObject()).longValue()); + } + return reader.readObject(); + } + @VisibleForTesting public LocalDateTime getDateTime(int rowIndex, FieldVector fieldVector) { TimeStampMicroVector vector = (TimeStampMicroVector) fieldVector; @@ -485,16 +503,21 @@ public class RowBatch { return null; } long time = vector.get(rowIndex); + return longToLocalDateTime(time); + } + + @VisibleForTesting + public static LocalDateTime longToLocalDateTime(long time) { Instant instant; - if (time / 10000000000L == 0) { // datetime(0) + // Determine the timestamp accuracy and process it + if (time < 10_000_000_000L) { // Second timestamp instant = Instant.ofEpochSecond(time); - } else if (time / 10000000000000L == 0) { // datetime(3) + } else if (time < 10_000_000_000_000L) { // milli second instant = Instant.ofEpochMilli(time); - } else { // datetime(6) - instant = Instant.ofEpochSecond(time / 1000000, time % 1000000 * 1000); + } else { // micro second + instant = Instant.ofEpochSecond(time / 1_000_000, (time % 1_000_000) * 1_000); } - LocalDateTime dateTime = LocalDateTime.ofInstant(instant, ZoneId.systemDefault()); - return dateTime; + return LocalDateTime.ofInstant(instant, DEFAULT_ZONE_ID); } @VisibleForTesting diff --git a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java index 4824180c..e1f7b6e5 100644 --- a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java +++ b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java @@ -76,6 +76,7 @@ import java.nio.charset.StandardCharsets; import java.time.LocalDate; import java.time.LocalDateTime; import java.time.ZoneId; +import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -1629,4 +1630,32 @@ public class TestRowBatch { thrown.expectMessage(startsWith("Get row offset")); rowBatch.addValueToRow(10, null); } + + @Test + public void longToLocalDateTimeTest() { + ZoneId defaultZoneId = ZoneId.systemDefault(); + LocalDateTime now = LocalDateTime.now(defaultZoneId).truncatedTo(ChronoUnit.MICROS); + + long secondTimestamp = now.toEpochSecond(defaultZoneId.getRules().getOffset(now)); + long milliTimestamp = now.atZone(defaultZoneId).toInstant().toEpochMilli(); + long microTimestamp = + now.toInstant(defaultZoneId.getRules().getOffset(now)).getEpochSecond() * 1_000_000 + + now.getNano() / 1_000; + + LocalDateTime dateTime1 = RowBatch.longToLocalDateTime(secondTimestamp); + LocalDateTime dateTime2 = RowBatch.longToLocalDateTime(milliTimestamp); + LocalDateTime dateTime3 = RowBatch.longToLocalDateTime(microTimestamp); + + long result1 = dateTime1.atZone(defaultZoneId).toInstant().getEpochSecond(); + long result2 = dateTime2.atZone(defaultZoneId).toInstant().toEpochMilli(); + long result3 = + dateTime3.toInstant(defaultZoneId.getRules().getOffset(dateTime3)).getEpochSecond() + * 1_000_000 + + dateTime3.getNano() / 1_000; + + long[] expectArray = {secondTimestamp, milliTimestamp, microTimestamp}; + long[] resultArray = {result1, result2, result3}; + + Assert.assertArrayEquals(expectArray, resultArray); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org For additional commands, e-mail: commits-h...@doris.apache.org