This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 2b595a78 [Fix](source) Fixed incorrect Map value reading when 
key/value is of DATE or DATETIME type (#419)
2b595a78 is described below

commit 2b595a7819378650052a5beac4c0b707876b23ab
Author: bingquanzhao <bingquan_z...@icloud.com>
AuthorDate: Mon Jul 15 10:16:33 2024 +0800

    [Fix](source) Fixed incorrect Map value reading when key/value is of DATE 
or DATETIME type (#419)
---
 .../apache/doris/flink/serialization/RowBatch.java | 37 ++++++++++++++++++----
 .../doris/flink/serialization/TestRowBatch.java    | 29 +++++++++++++++++
 2 files changed, 59 insertions(+), 7 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index c7afe7f5..1a42b2b9 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -40,7 +40,10 @@ import org.apache.arrow.vector.VectorSchemaRoot;
 import org.apache.arrow.vector.complex.ListVector;
 import org.apache.arrow.vector.complex.MapVector;
 import org.apache.arrow.vector.complex.StructVector;
+import org.apache.arrow.vector.complex.impl.DateDayReaderImpl;
+import org.apache.arrow.vector.complex.impl.TimeStampMicroReaderImpl;
 import org.apache.arrow.vector.complex.impl.UnionMapReader;
+import org.apache.arrow.vector.complex.reader.FieldReader;
 import org.apache.arrow.vector.ipc.ArrowStreamReader;
 import org.apache.arrow.vector.types.Types;
 import org.apache.doris.flink.exception.DorisException;
@@ -105,6 +108,7 @@ public class RowBatch {
     private final DateTimeFormatter dateTimeV2Formatter =
             DateTimeFormatter.ofPattern(DATETIMEV2_PATTERN);
     private final DateTimeFormatter dateFormatter = 
DateTimeFormatter.ofPattern("yyyy-MM-dd");
+    private static final ZoneId DEFAULT_ZONE_ID = ZoneId.systemDefault();
 
     public List<Row> getRowBatch() {
         return rowBatch;
@@ -454,7 +458,11 @@ public class RowBatch {
                 reader.setPosition(rowIndex);
                 Map<String, Object> mapValue = new HashMap<>();
                 while (reader.next()) {
-                    mapValue.put(reader.key().readObject().toString(), 
reader.value().readObject());
+                    FieldReader keyReader = reader.key();
+                    FieldReader valueReader = reader.value();
+                    Object mapKeyObj = handleMapFieldReader(keyReader);
+                    Object mapValueObj = handleMapFieldReader(valueReader);
+                    mapValue.put(mapKeyObj.toString(), mapValueObj);
                 }
                 addValueToRow(rowIndex, mapValue);
                 break;
@@ -478,6 +486,16 @@ public class RowBatch {
         return true;
     }
 
+    private Object handleMapFieldReader(FieldReader reader) {
+        if (reader instanceof TimeStampMicroReaderImpl) {
+            return longToLocalDateTime(reader.readLong());
+        }
+        if (reader instanceof DateDayReaderImpl) {
+            return LocalDate.ofEpochDay(((Integer) 
reader.readObject()).longValue());
+        }
+        return reader.readObject();
+    }
+
     @VisibleForTesting
     public LocalDateTime getDateTime(int rowIndex, FieldVector fieldVector) {
         TimeStampMicroVector vector = (TimeStampMicroVector) fieldVector;
@@ -485,16 +503,21 @@ public class RowBatch {
             return null;
         }
         long time = vector.get(rowIndex);
+        return longToLocalDateTime(time);
+    }
+
+    @VisibleForTesting
+    public static LocalDateTime longToLocalDateTime(long time) {
         Instant instant;
-        if (time / 10000000000L == 0) { // datetime(0)
+        // Determine the timestamp accuracy and process it
+        if (time < 10_000_000_000L) { // Second timestamp
             instant = Instant.ofEpochSecond(time);
-        } else if (time / 10000000000000L == 0) { // datetime(3)
+        } else if (time < 10_000_000_000_000L) { // milli second
             instant = Instant.ofEpochMilli(time);
-        } else { // datetime(6)
-            instant = Instant.ofEpochSecond(time / 1000000, time % 1000000 * 
1000);
+        } else { // micro second
+            instant = Instant.ofEpochSecond(time / 1_000_000, (time % 
1_000_000) * 1_000);
         }
-        LocalDateTime dateTime = LocalDateTime.ofInstant(instant, 
ZoneId.systemDefault());
-        return dateTime;
+        return LocalDateTime.ofInstant(instant, DEFAULT_ZONE_ID);
     }
 
     @VisibleForTesting
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
index 4824180c..e1f7b6e5 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
@@ -76,6 +76,7 @@ import java.nio.charset.StandardCharsets;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.time.ZoneId;
+import java.time.temporal.ChronoUnit;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -1629,4 +1630,32 @@ public class TestRowBatch {
         thrown.expectMessage(startsWith("Get row offset"));
         rowBatch.addValueToRow(10, null);
     }
+
+    @Test
+    public void longToLocalDateTimeTest() {
+        ZoneId defaultZoneId = ZoneId.systemDefault();
+        LocalDateTime now = 
LocalDateTime.now(defaultZoneId).truncatedTo(ChronoUnit.MICROS);
+
+        long secondTimestamp = 
now.toEpochSecond(defaultZoneId.getRules().getOffset(now));
+        long milliTimestamp = 
now.atZone(defaultZoneId).toInstant().toEpochMilli();
+        long microTimestamp =
+                
now.toInstant(defaultZoneId.getRules().getOffset(now)).getEpochSecond() * 
1_000_000
+                        + now.getNano() / 1_000;
+
+        LocalDateTime dateTime1 = 
RowBatch.longToLocalDateTime(secondTimestamp);
+        LocalDateTime dateTime2 = RowBatch.longToLocalDateTime(milliTimestamp);
+        LocalDateTime dateTime3 = RowBatch.longToLocalDateTime(microTimestamp);
+
+        long result1 = 
dateTime1.atZone(defaultZoneId).toInstant().getEpochSecond();
+        long result2 = 
dateTime2.atZone(defaultZoneId).toInstant().toEpochMilli();
+        long result3 =
+                
dateTime3.toInstant(defaultZoneId.getRules().getOffset(dateTime3)).getEpochSecond()
+                                * 1_000_000
+                        + dateTime3.getNano() / 1_000;
+
+        long[] expectArray = {secondTimestamp, milliTimestamp, microTimestamp};
+        long[] resultArray = {result1, result2, result3};
+
+        Assert.assertArrayEquals(expectArray, resultArray);
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@doris.apache.org
For additional commands, e-mail: commits-h...@doris.apache.org

Reply via email to