This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-flink-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 8fbcb41  [improve]Compatible with reading doris date/datetime arrow 
type modification (#351)
8fbcb41 is described below

commit 8fbcb41393dedeb90010181c46f93ec42680c587
Author: caoliang-web <[email protected]>
AuthorDate: Fri Mar 22 15:45:18 2024 +0800

    [improve]Compatible with reading doris date/datetime arrow type 
modification (#351)
---
 .../apache/doris/flink/serialization/RowBatch.java |  99 +++++---
 .../doris/flink/serialization/TestRowBatch.java    | 263 +++++++++++++++++++++
 2 files changed, 335 insertions(+), 27 deletions(-)

diff --git 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
index c0c2aec..ed649ca 100644
--- 
a/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
+++ 
b/flink-doris-connector/src/main/java/org/apache/doris/flink/serialization/RowBatch.java
@@ -22,6 +22,7 @@ import org.apache.flink.util.Preconditions;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.BigIntVector;
 import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
 import org.apache.arrow.vector.DecimalVector;
 import org.apache.arrow.vector.FieldVector;
 import org.apache.arrow.vector.FixedSizeBinaryVector;
@@ -29,6 +30,7 @@ import org.apache.arrow.vector.Float4Vector;
 import org.apache.arrow.vector.Float8Vector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
 import org.apache.arrow.vector.TinyIntVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
@@ -50,8 +52,10 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.math.BigInteger;
+import java.time.Instant;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -274,44 +278,67 @@ public class RowBatch {
                 break;
             case "DATE":
             case "DATEV2":
-                if (!minorType.equals(Types.MinorType.VARCHAR)) {
+                if (!minorType.equals(Types.MinorType.DATEDAY)
+                        && !minorType.equals(Types.MinorType.VARCHAR)) {
                     return false;
                 }
-                VarCharVector date = (VarCharVector) fieldVector;
-                if (date.isNull(rowIndex)) {
-                    addValueToRow(rowIndex, null);
-                    break;
+                if (minorType.equals(Types.MinorType.VARCHAR)) {
+                    VarCharVector date = (VarCharVector) fieldVector;
+                    if (date.isNull(rowIndex)) {
+                        addValueToRow(rowIndex, null);
+                        break;
+                    }
+                    String stringValue = new String(date.get(rowIndex));
+                    LocalDate localDate = LocalDate.parse(stringValue, 
dateFormatter);
+                    addValueToRow(rowIndex, localDate);
+                } else {
+                    DateDayVector date = (DateDayVector) fieldVector;
+                    if (date.isNull(rowIndex)) {
+                        addValueToRow(rowIndex, null);
+                        break;
+                    }
+                    LocalDate localDate = 
LocalDate.ofEpochDay(date.get(rowIndex));
+                    addValueToRow(rowIndex, localDate);
                 }
-                String stringValue = new String(date.get(rowIndex));
-                LocalDate localDate = LocalDate.parse(stringValue, 
dateFormatter);
-                addValueToRow(rowIndex, localDate);
                 break;
             case "DATETIME":
-                if (!minorType.equals(Types.MinorType.VARCHAR)) {
+                if (!minorType.equals(Types.MinorType.TIMESTAMPMICRO)
+                        && !minorType.equals(Types.MinorType.VARCHAR)) {
                     return false;
                 }
-                VarCharVector timeStampSecVector = (VarCharVector) fieldVector;
-                if (timeStampSecVector.isNull(rowIndex)) {
-                    addValueToRow(rowIndex, null);
-                    break;
+                if (minorType.equals(Types.MinorType.VARCHAR)) {
+                    VarCharVector varCharVector = (VarCharVector) fieldVector;
+                    if (varCharVector.isNull(rowIndex)) {
+                        addValueToRow(rowIndex, null);
+                        break;
+                    }
+                    String stringValue = new 
String(varCharVector.get(rowIndex));
+                    LocalDateTime parse = LocalDateTime.parse(stringValue, 
dateTimeFormatter);
+                    addValueToRow(rowIndex, parse);
+                } else {
+                    LocalDateTime dateTime = getDateTime(rowIndex, 
fieldVector);
+                    addValueToRow(rowIndex, dateTime);
                 }
-                stringValue = new String(timeStampSecVector.get(rowIndex));
-                LocalDateTime parse = LocalDateTime.parse(stringValue, 
dateTimeFormatter);
-                addValueToRow(rowIndex, parse);
                 break;
             case "DATETIMEV2":
-                if (!minorType.equals(Types.MinorType.VARCHAR)) {
+                if (!minorType.equals(Types.MinorType.TIMESTAMPMICRO)
+                        && !minorType.equals(Types.MinorType.VARCHAR)) {
                     return false;
                 }
-                VarCharVector timeStampV2SecVector = (VarCharVector) 
fieldVector;
-                if (timeStampV2SecVector.isNull(rowIndex)) {
-                    addValueToRow(rowIndex, null);
-                    break;
+                if (minorType.equals(Types.MinorType.VARCHAR)) {
+                    VarCharVector varCharVector = (VarCharVector) fieldVector;
+                    if (varCharVector.isNull(rowIndex)) {
+                        addValueToRow(rowIndex, null);
+                        break;
+                    }
+                    String stringValue = new 
String(varCharVector.get(rowIndex));
+                    stringValue = completeMilliseconds(stringValue);
+                    LocalDateTime parse = LocalDateTime.parse(stringValue, 
dateTimeV2Formatter);
+                    addValueToRow(rowIndex, parse);
+                } else {
+                    LocalDateTime dateTime = getDateTime(rowIndex, 
fieldVector);
+                    addValueToRow(rowIndex, dateTime);
                 }
-                stringValue = new String(timeStampV2SecVector.get(rowIndex));
-                stringValue = completeMilliseconds(stringValue);
-                parse = LocalDateTime.parse(stringValue, dateTimeV2Formatter);
-                addValueToRow(rowIndex, parse);
                 break;
             case "LARGEINT":
                 if (!minorType.equals(Types.MinorType.FIXEDSIZEBINARY)
@@ -342,7 +369,7 @@ public class RowBatch {
                         addValueToRow(rowIndex, null);
                         break;
                     }
-                    stringValue = new String(largeIntVector.get(rowIndex));
+                    String stringValue = new 
String(largeIntVector.get(rowIndex));
                     BigInteger largeInt = new BigInteger(stringValue);
                     addValueToRow(rowIndex, largeInt);
                     break;
@@ -359,7 +386,7 @@ public class RowBatch {
                     addValueToRow(rowIndex, null);
                     break;
                 }
-                stringValue = new String(varCharVector.get(rowIndex));
+                String stringValue = new String(varCharVector.get(rowIndex));
                 addValueToRow(rowIndex, stringValue);
                 break;
             case "ARRAY":
@@ -409,6 +436,24 @@ public class RowBatch {
         return true;
     }
 
+    private LocalDateTime getDateTime(int rowIndex, FieldVector fieldVector) {
+        TimeStampMicroVector vector = (TimeStampMicroVector) fieldVector;
+        if (vector.isNull(rowIndex)) {
+            return null;
+        }
+        long time = vector.get(rowIndex);
+        Instant instant;
+        if (time / 10000000000L == 0) { // datetime(0)
+            instant = Instant.ofEpochSecond(time);
+        } else if (time / 10000000000000L == 0) { // datetime(3)
+            instant = Instant.ofEpochMilli(time);
+        } else { // datetime(6)
+            instant = Instant.ofEpochSecond(time / 1000000, time % 1000000 * 
1000);
+        }
+        LocalDateTime dateTime = LocalDateTime.ofInstant(instant, 
ZoneId.systemDefault());
+        return dateTime;
+    }
+
     private String completeMilliseconds(String stringValue) {
         if (stringValue.length() == DATETIMEV2_PATTERN.length()) {
             return stringValue;
diff --git 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
index 6c8adfe..6b29078 100644
--- 
a/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
+++ 
b/flink-doris-connector/src/test/java/org/apache/doris/flink/serialization/TestRowBatch.java
@@ -25,12 +25,15 @@ import org.apache.arrow.memory.ArrowBuf;
 import org.apache.arrow.memory.RootAllocator;
 import org.apache.arrow.vector.BigIntVector;
 import org.apache.arrow.vector.BitVector;
+import org.apache.arrow.vector.DateDayVector;
 import org.apache.arrow.vector.DecimalVector;
 import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.FixedSizeBinaryVector;
 import org.apache.arrow.vector.Float4Vector;
 import org.apache.arrow.vector.Float8Vector;
 import org.apache.arrow.vector.IntVector;
 import org.apache.arrow.vector.SmallIntVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
 import org.apache.arrow.vector.TinyIntVector;
 import org.apache.arrow.vector.VarBinaryVector;
 import org.apache.arrow.vector.VarCharVector;
@@ -41,11 +44,14 @@ import 
org.apache.arrow.vector.complex.impl.NullableStructWriter;
 import org.apache.arrow.vector.complex.impl.UnionMapWriter;
 import org.apache.arrow.vector.dictionary.DictionaryProvider;
 import org.apache.arrow.vector.ipc.ArrowStreamWriter;
+import org.apache.arrow.vector.types.DateUnit;
 import org.apache.arrow.vector.types.FloatingPointPrecision;
+import org.apache.arrow.vector.types.TimeUnit;
 import org.apache.arrow.vector.types.pojo.ArrowType;
 import org.apache.arrow.vector.types.pojo.Field;
 import org.apache.arrow.vector.types.pojo.FieldType;
 import org.apache.arrow.vector.util.Text;
+import org.apache.commons.lang3.ArrayUtils;
 import org.apache.doris.flink.exception.DorisException;
 import org.apache.doris.flink.rest.RestService;
 import org.apache.doris.flink.rest.models.Schema;
@@ -62,9 +68,11 @@ import org.slf4j.LoggerFactory;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.math.BigDecimal;
+import java.math.BigInteger;
 import java.nio.charset.StandardCharsets;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
+import java.time.ZoneId;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -622,4 +630,259 @@ public class TestRowBatch {
         Assert.assertTrue(
                 ImmutableMap.of("a", new Text("a1"), "b", 
1).equals(rowBatch.next().get(0)));
     }
+
+    @Test
+    public void testDate() throws DorisException, IOException {
+
+        ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+        childrenBuilder.add(new Field("k1", FieldType.nullable(new 
ArrowType.Utf8()), null));
+        childrenBuilder.add(new Field("k2", FieldType.nullable(new 
ArrowType.Utf8()), null));
+        childrenBuilder.add(
+                new Field("k3", FieldType.nullable(new 
ArrowType.Date(DateUnit.DAY)), null));
+
+        VectorSchemaRoot root =
+                VectorSchemaRoot.create(
+                        new org.apache.arrow.vector.types.pojo.Schema(
+                                childrenBuilder.build(), null),
+                        new RootAllocator(Integer.MAX_VALUE));
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        ArrowStreamWriter arrowStreamWriter =
+                new ArrowStreamWriter(
+                        root, new DictionaryProvider.MapDictionaryProvider(), 
outputStream);
+
+        arrowStreamWriter.start();
+        root.setRowCount(1);
+
+        FieldVector vector = root.getVector("k1");
+        VarCharVector dateVector = (VarCharVector) vector;
+        dateVector.setInitialCapacity(1);
+        dateVector.allocateNew();
+        dateVector.setIndexDefined(0);
+        dateVector.setValueLengthSafe(0, 10);
+        dateVector.setSafe(0, "2023-08-09".getBytes());
+        vector.setValueCount(1);
+
+        vector = root.getVector("k2");
+        VarCharVector dateV2Vector = (VarCharVector) vector;
+        dateV2Vector.setInitialCapacity(1);
+        dateV2Vector.allocateNew();
+        dateV2Vector.setIndexDefined(0);
+        dateV2Vector.setValueLengthSafe(0, 10);
+        dateV2Vector.setSafe(0, "2023-08-10".getBytes());
+        vector.setValueCount(1);
+
+        vector = root.getVector("k3");
+        DateDayVector dateNewVector = (DateDayVector) vector;
+        dateNewVector.setInitialCapacity(1);
+        dateNewVector.allocateNew();
+        dateNewVector.setIndexDefined(0);
+        dateNewVector.setSafe(0, 19802);
+        vector.setValueCount(1);
+
+        arrowStreamWriter.writeBatch();
+
+        arrowStreamWriter.end();
+        arrowStreamWriter.close();
+
+        TStatus status = new TStatus();
+        status.setStatusCode(TStatusCode.OK);
+        TScanBatchResult scanBatchResult = new TScanBatchResult();
+        scanBatchResult.setStatus(status);
+        scanBatchResult.setEos(false);
+        scanBatchResult.setRows(outputStream.toByteArray());
+
+        String schemaStr =
+                "{\"properties\":["
+                        + 
"{\"type\":\"DATE\",\"name\":\"k1\",\"comment\":\"\"}, "
+                        + 
"{\"type\":\"DATEV2\",\"name\":\"k2\",\"comment\":\"\"}, "
+                        + 
"{\"type\":\"DATEV2\",\"name\":\"k3\",\"comment\":\"\"}"
+                        + "], \"status\":200}";
+
+        Schema schema = RestService.parseSchema(schemaStr, logger);
+
+        RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow();
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow0 = rowBatch.next();
+        Assert.assertEquals(LocalDate.of(2023, 8, 9), actualRow0.get(0));
+        Assert.assertEquals(LocalDate.of(2023, 8, 10), actualRow0.get(1));
+        Assert.assertEquals(LocalDate.of(2024, 3, 20), actualRow0.get(2));
+
+        Assert.assertFalse(rowBatch.hasNext());
+        thrown.expect(NoSuchElementException.class);
+        thrown.expectMessage(startsWith("Get row offset:"));
+        rowBatch.next();
+    }
+
+    @Test
+    public void testDateTime() throws IOException, DorisException {
+
+        ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+        childrenBuilder.add(new Field("k1", FieldType.nullable(new 
ArrowType.Utf8()), null));
+        childrenBuilder.add(
+                new Field(
+                        "k2",
+                        FieldType.nullable(new 
ArrowType.Timestamp(TimeUnit.MICROSECOND, null)),
+                        null));
+
+        VectorSchemaRoot root =
+                VectorSchemaRoot.create(
+                        new org.apache.arrow.vector.types.pojo.Schema(
+                                childrenBuilder.build(), null),
+                        new RootAllocator(Integer.MAX_VALUE));
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        ArrowStreamWriter arrowStreamWriter =
+                new ArrowStreamWriter(
+                        root, new DictionaryProvider.MapDictionaryProvider(), 
outputStream);
+
+        arrowStreamWriter.start();
+        root.setRowCount(3);
+
+        FieldVector vector = root.getVector("k1");
+        VarCharVector datetimeVector = (VarCharVector) vector;
+        datetimeVector.setInitialCapacity(3);
+        datetimeVector.allocateNew();
+        datetimeVector.setIndexDefined(0);
+        datetimeVector.setValueLengthSafe(0, 20);
+        datetimeVector.setSafe(0, "2024-03-20 00:00:00".getBytes());
+        datetimeVector.setIndexDefined(1);
+        datetimeVector.setValueLengthSafe(1, 20);
+        datetimeVector.setSafe(1, "2024-03-20 00:00:01".getBytes());
+        datetimeVector.setIndexDefined(2);
+        datetimeVector.setValueLengthSafe(2, 20);
+        datetimeVector.setSafe(2, "2024-03-20 00:00:02".getBytes());
+        vector.setValueCount(3);
+
+        LocalDateTime localDateTime = LocalDateTime.of(2024, 3, 20, 0, 0, 0, 
123456000);
+        long second = 
localDateTime.atZone(ZoneId.systemDefault()).toEpochSecond();
+        int nano = localDateTime.getNano();
+
+        vector = root.getVector("k2");
+        TimeStampMicroVector datetimeV2Vector = (TimeStampMicroVector) vector;
+        datetimeV2Vector.setInitialCapacity(3);
+        datetimeV2Vector.allocateNew();
+        datetimeV2Vector.setIndexDefined(0);
+        datetimeV2Vector.setSafe(0, second);
+        datetimeV2Vector.setIndexDefined(1);
+        datetimeV2Vector.setSafe(1, second * 1000 + nano / 1000000);
+        datetimeV2Vector.setIndexDefined(2);
+        datetimeV2Vector.setSafe(2, second * 1000000 + nano / 1000);
+        vector.setValueCount(3);
+
+        arrowStreamWriter.writeBatch();
+
+        arrowStreamWriter.end();
+        arrowStreamWriter.close();
+
+        TStatus status = new TStatus();
+        status.setStatusCode(TStatusCode.OK);
+        TScanBatchResult scanBatchResult = new TScanBatchResult();
+        scanBatchResult.setStatus(status);
+        scanBatchResult.setEos(false);
+        scanBatchResult.setRows(outputStream.toByteArray());
+
+        String schemaStr =
+                "{\"properties\":["
+                        + 
"{\"type\":\"DATETIME\",\"name\":\"k1\",\"comment\":\"\"}, "
+                        + 
"{\"type\":\"DATETIMEV2\",\"name\":\"k2\",\"comment\":\"\"}"
+                        + "], \"status\":200}";
+
+        Schema schema = RestService.parseSchema(schemaStr, logger);
+
+        RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow();
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow0 = rowBatch.next();
+        Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0), 
actualRow0.get(0));
+        Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0), 
actualRow0.get(1));
+
+        List<Object> actualRow1 = rowBatch.next();
+        Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 1), 
actualRow1.get(0));
+        Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0, 123000000), 
actualRow1.get(1));
+
+        List<Object> actualRow2 = rowBatch.next();
+        Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 2), 
actualRow2.get(0));
+        Assert.assertEquals(LocalDateTime.of(2024, 3, 20, 0, 0, 0, 123456000), 
actualRow2.get(1));
+
+        Assert.assertFalse(rowBatch.hasNext());
+        thrown.expect(NoSuchElementException.class);
+        thrown.expectMessage(startsWith("Get row offset:"));
+        rowBatch.next();
+    }
+
+    @Test
+    public void testLargeInt() throws DorisException, IOException {
+
+        ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+        childrenBuilder.add(new Field("k1", FieldType.nullable(new 
ArrowType.Utf8()), null));
+        childrenBuilder.add(
+                new Field("k2", FieldType.nullable(new 
ArrowType.FixedSizeBinary(16)), null));
+
+        VectorSchemaRoot root =
+                VectorSchemaRoot.create(
+                        new org.apache.arrow.vector.types.pojo.Schema(
+                                childrenBuilder.build(), null),
+                        new RootAllocator(Integer.MAX_VALUE));
+        ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+        ArrowStreamWriter arrowStreamWriter =
+                new ArrowStreamWriter(
+                        root, new DictionaryProvider.MapDictionaryProvider(), 
outputStream);
+
+        arrowStreamWriter.start();
+        root.setRowCount(1);
+
+        FieldVector vector = root.getVector("k1");
+        VarCharVector lageIntVector = (VarCharVector) vector;
+        lageIntVector.setInitialCapacity(1);
+        lageIntVector.allocateNew();
+        lageIntVector.setIndexDefined(0);
+        lageIntVector.setValueLengthSafe(0, 19);
+        lageIntVector.setSafe(0, "9223372036854775808".getBytes());
+        vector.setValueCount(1);
+
+        vector = root.getVector("k2");
+        FixedSizeBinaryVector lageIntVector1 = (FixedSizeBinaryVector) vector;
+        lageIntVector1.setInitialCapacity(1);
+        lageIntVector1.allocateNew();
+        lageIntVector1.setIndexDefined(0);
+        byte[] bytes = new BigInteger("9223372036854775809").toByteArray();
+        byte[] fixedBytes = new byte[16];
+        System.arraycopy(bytes, 0, fixedBytes, 16 - bytes.length, 
bytes.length);
+        ArrayUtils.reverse(fixedBytes);
+        lageIntVector1.setSafe(0, fixedBytes);
+        vector.setValueCount(1);
+
+        arrowStreamWriter.writeBatch();
+
+        arrowStreamWriter.end();
+        arrowStreamWriter.close();
+
+        TStatus status = new TStatus();
+        status.setStatusCode(TStatusCode.OK);
+        TScanBatchResult scanBatchResult = new TScanBatchResult();
+        scanBatchResult.setStatus(status);
+        scanBatchResult.setEos(false);
+        scanBatchResult.setRows(outputStream.toByteArray());
+
+        String schemaStr =
+                "{\"properties\":["
+                        + 
"{\"type\":\"LARGEINT\",\"name\":\"k1\",\"comment\":\"\"}, "
+                        + 
"{\"type\":\"LARGEINT\",\"name\":\"k2\",\"comment\":\"\"}"
+                        + "], \"status\":200}";
+
+        Schema schema = RestService.parseSchema(schemaStr, logger);
+
+        RowBatch rowBatch = new RowBatch(scanBatchResult, schema).readArrow();
+
+        Assert.assertTrue(rowBatch.hasNext());
+        List<Object> actualRow0 = rowBatch.next();
+
+        Assert.assertEquals(new BigInteger("9223372036854775808"), 
actualRow0.get(0));
+        Assert.assertEquals(new BigInteger("9223372036854775809"), 
actualRow0.get(1));
+
+        Assert.assertFalse(rowBatch.hasNext());
+        thrown.expect(NoSuchElementException.class);
+        thrown.expectMessage(startsWith("Get row offset:"));
+        rowBatch.next();
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to