This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new 8ed1134 [fix](connector) fix read date type err (#265)
8ed1134 is described below
commit 8ed1134d0dacf655cb488f408494cfbb29376842
Author: gnehil <[email protected]>
AuthorDate: Mon Feb 17 15:33:07 2025 +0800
[fix](connector) fix read date type err (#265)
---
.../spark/client/entity/DorisReaderPartition.java | 19 +++-
.../spark/client/read/AbstractThriftReader.java | 7 +-
.../spark/client/read/DorisFlightSqlReader.java | 4 +-
.../client/read/ReaderPartitionGenerator.java | 14 +--
.../apache/doris/spark/client/read/RowBatch.java | 20 +++-
.../apache/doris/spark/rdd/AbstractDorisRDD.scala | 4 +-
.../apache/doris/spark/util/RowConvertors.scala | 5 +-
.../doris/spark/client/read/RowBatchTest.java | 104 ++++++++++++++++++---
.../doris/spark/util/RowConvertorsTest.scala | 22 +++--
.../spark-doris-connector-spark-2/pom.xml | 4 -
.../doris/spark/read/AbstractDorisScan.scala | 12 ++-
.../doris/spark/read/DorisPartitionReader.scala | 14 +--
12 files changed, 176 insertions(+), 53 deletions(-)
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/DorisReaderPartition.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/DorisReaderPartition.java
index aa75319..608f019 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/DorisReaderPartition.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/entity/DorisReaderPartition.java
@@ -34,8 +34,10 @@ public class DorisReaderPartition implements Serializable {
private final String[] filters;
private final Integer limit;
private final DorisConfig config;
+ private final Boolean datetimeJava8ApiEnabled;
- public DorisReaderPartition(String database, String table, Backend
backend, Long[] tablets, String opaquedQueryPlan, String[] readColumns,
String[] filters, DorisConfig config) {
+ public DorisReaderPartition(String database, String table, Backend
backend, Long[] tablets, String opaquedQueryPlan,
+ String[] readColumns, String[] filters,
DorisConfig config, Boolean datetimeJava8ApiEnabled) {
this.database = database;
this.table = table;
this.backend = backend;
@@ -45,9 +47,11 @@ public class DorisReaderPartition implements Serializable {
this.filters = filters;
this.limit = -1;
this.config = config;
+ this.datetimeJava8ApiEnabled = datetimeJava8ApiEnabled;
}
- public DorisReaderPartition(String database, String table, Backend
backend, Long[] tablets, String opaquedQueryPlan, String[] readColumns,
String[] filters, Integer limit, DorisConfig config) {
+ public DorisReaderPartition(String database, String table, Backend
backend, Long[] tablets, String opaquedQueryPlan,
+ String[] readColumns, String[] filters,
Integer limit, DorisConfig config, Boolean datetimeJava8ApiEnabled) {
this.database = database;
this.table = table;
this.backend = backend;
@@ -57,6 +61,7 @@ public class DorisReaderPartition implements Serializable {
this.filters = filters;
this.limit = limit;
this.config = config;
+ this.datetimeJava8ApiEnabled = datetimeJava8ApiEnabled;
}
// Getters and Setters
@@ -96,6 +101,10 @@ public class DorisReaderPartition implements Serializable {
return limit;
}
+ public Boolean getDateTimeJava8APIEnabled() {
+ return datetimeJava8ApiEnabled;
+ }
+
@Override
public boolean equals(Object o) {
if (o == null || getClass() != o.getClass()) return false;
@@ -108,11 +117,13 @@ public class DorisReaderPartition implements Serializable
{
&& Objects.deepEquals(readColumns, that.readColumns)
&& Objects.deepEquals(filters, that.filters)
&& Objects.equals(limit, that.limit)
- && Objects.equals(config, that.config);
+ && Objects.equals(config, that.config)
+ && Objects.equals(datetimeJava8ApiEnabled,
that.datetimeJava8ApiEnabled);
}
@Override
public int hashCode() {
- return Objects.hash(database, table, backend,
Arrays.hashCode(tablets), opaquedQueryPlan, Arrays.hashCode(readColumns),
Arrays.hashCode(filters), limit, config);
+ return Objects.hash(database, table, backend,
Arrays.hashCode(tablets), opaquedQueryPlan,
+ Arrays.hashCode(readColumns), Arrays.hashCode(filters), limit,
config, datetimeJava8ApiEnabled);
}
}
\ No newline at end of file
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
index c533de8..7fdb1cf 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/AbstractThriftReader.java
@@ -73,6 +73,8 @@ public abstract class AbstractThriftReader extends
DorisReader {
private int readCount = 0;
+ private final Boolean datetimeJava8ApiEnabled;
+
protected AbstractThriftReader(DorisReaderPartition partition) throws
Exception {
super(partition);
this.frontend = new DorisFrontendClient(config);
@@ -108,6 +110,7 @@ public abstract class AbstractThriftReader extends
DorisReader {
this.rowBatchQueue = null;
this.asyncThread = null;
}
+ this.datetimeJava8ApiEnabled = false;
}
private void runAsync() throws DorisException, InterruptedException {
@@ -124,7 +127,7 @@ public abstract class AbstractThriftReader extends
DorisReader {
});
endOfStream.set(nextResult.isEos());
if (!endOfStream.get()) {
- rowBatch = new RowBatch(nextResult, dorisSchema);
+ rowBatch = new RowBatch(nextResult, dorisSchema,
datetimeJava8ApiEnabled);
offset += rowBatch.getReadRowCount();
rowBatch.close();
rowBatchQueue.put(rowBatch);
@@ -178,7 +181,7 @@ public abstract class AbstractThriftReader extends
DorisReader {
});
endOfStream.set(nextResult.isEos());
if (!endOfStream.get()) {
- rowBatch = new RowBatch(nextResult, dorisSchema);
+ rowBatch = new RowBatch(nextResult, dorisSchema,
datetimeJava8ApiEnabled);
}
}
hasNext = !endOfStream.get();
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
index faa462b..4623d65 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
@@ -59,6 +59,7 @@ public class DorisFlightSqlReader extends DorisReader {
private final Schema schema;
private AdbcConnection connection;
private final ArrowReader arrowReader;
+ private final Boolean datetimeJava8ApiEnabled;
public DorisFlightSqlReader(DorisReaderPartition partition) throws
Exception {
super(partition);
@@ -74,6 +75,7 @@ public class DorisFlightSqlReader extends DorisReader {
}
this.schema = processDorisSchema(partition);
this.arrowReader = executeQuery();
+ this.datetimeJava8ApiEnabled = partition.getDateTimeJava8APIEnabled();
}
@Override
@@ -85,7 +87,7 @@ public class DorisFlightSqlReader extends DorisReader {
throw new DorisException(e);
}
if (!endOfStream.get()) {
- rowBatch = new RowBatch(arrowReader, schema);
+ rowBatch = new RowBatch(arrowReader, schema,
datetimeJava8ApiEnabled);
}
}
return !endOfStream.get();
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java
index 4aa660a..002b58b 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/ReaderPartitionGenerator.java
@@ -45,7 +45,7 @@ public class ReaderPartitionGenerator {
/*
* for spark 2
*/
- public static DorisReaderPartition[] generatePartitions(DorisConfig
config) throws Exception {
+ public static DorisReaderPartition[] generatePartitions(DorisConfig
config, Boolean datetimeJava8ApiEnabled) throws Exception {
String[] originReadCols;
if (config.contains(DorisOptions.DORIS_READ_FIELDS) &&
!config.getValue(DorisOptions.DORIS_READ_FIELDS).equals("*")) {
originReadCols =
Arrays.stream(config.getValue(DorisOptions.DORIS_READ_FIELDS).split(","))
@@ -55,14 +55,15 @@ public class ReaderPartitionGenerator {
}
String[] filters = config.contains(DorisOptions.DORIS_FILTER_QUERY) ?
config.getValue(DorisOptions.DORIS_FILTER_QUERY).split("\\.")
: new String[0];
- return generatePartitions(config, originReadCols, filters, -1);
+ return generatePartitions(config, originReadCols, filters, -1,
datetimeJava8ApiEnabled);
}
/*
* for spark 3
*/
public static DorisReaderPartition[] generatePartitions(DorisConfig config,
- String[] fields,
String[] filters, Integer limit) throws Exception {
+ String[] fields,
String[] filters, Integer limit,
+ Boolean
datetimeJava8ApiEnabled) throws Exception {
DorisFrontendClient frontend = new DorisFrontendClient(config);
String fullTableName =
config.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER);
String[] tableParts = fullTableName.split("\\.");
@@ -81,7 +82,7 @@ public class ReaderPartitionGenerator {
Map<String, List<Long>> beToTablets = mappingBeToTablets(queryPlan);
int maxTabletSize = config.getValue(DorisOptions.DORIS_TABLET_SIZE);
return distributeTabletsToPartitions(db, table, beToTablets,
queryPlan.getOpaqued_query_plan(), maxTabletSize,
- finalReadColumns, filters, config, limit);
+ finalReadColumns, filters, config, limit,
datetimeJava8ApiEnabled);
}
@VisibleForTesting
@@ -112,7 +113,8 @@ public class ReaderPartitionGenerator {
Map<String, List<Long>> beToTablets,
String
opaquedQueryPlan, int maxTabletSize,
String[] readColumns, String[] predicates,
-
DorisConfig config, Integer limit) {
+
DorisConfig config, Integer limit,
+
Boolean datetimeJava8ApiEnabled) {
List<DorisReaderPartition> partitions = new ArrayList<>();
beToTablets.forEach((backendStr, tabletIds) -> {
List<Long> distinctTablets = new ArrayList<>(new
HashSet<>(tabletIds));
@@ -121,7 +123,7 @@ public class ReaderPartitionGenerator {
Long[] tablets = distinctTablets.subList(offset,
Math.min(offset + maxTabletSize, distinctTablets.size())).toArray(new Long[0]);
offset += maxTabletSize;
partitions.add(new DorisReaderPartition(database, table, new
Backend(backendStr), tablets,
- opaquedQueryPlan, readColumns, predicates, limit,
config));
+ opaquedQueryPlan, readColumns, predicates, limit,
config, datetimeJava8ApiEnabled));
}
});
return partitions.toArray(new DorisReaderPartition[0]);
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
index 840825c..6b06b37 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/RowBatch.java
@@ -103,11 +103,14 @@ public class RowBatch implements Serializable {
private int readRowCount = 0;
private List<FieldVector> fieldVectors;
- public RowBatch(TScanBatchResult nextResult, Schema schema) throws
DorisException {
+ private final Boolean datetimeJava8ApiEnabled;
+
+ public RowBatch(TScanBatchResult nextResult, Schema schema, Boolean
datetimeJava8ApiEnabled) throws DorisException {
this.rootAllocator = new RootAllocator(Integer.MAX_VALUE);
this.arrowReader = new ArrowStreamReader(new
ByteArrayInputStream(nextResult.getRows()), rootAllocator);
this.schema = schema;
+ this.datetimeJava8ApiEnabled = datetimeJava8ApiEnabled;
try {
VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
@@ -123,10 +126,11 @@ public class RowBatch implements Serializable {
}
- public RowBatch(ArrowReader reader, Schema schema) throws DorisException {
+ public RowBatch(ArrowReader reader, Schema schema, Boolean
datetimeJava8ApiEnabled) throws DorisException {
this.arrowReader = reader;
this.schema = schema;
+ this.datetimeJava8ApiEnabled = datetimeJava8ApiEnabled;
try {
VectorSchemaRoot root = arrowReader.getVectorSchemaRoot();
@@ -368,7 +372,11 @@ public class RowBatch implements Serializable {
}
String stringValue = new
String(date.get(rowIndex));
LocalDate localDate =
LocalDate.parse(stringValue);
- addValueToRow(rowIndex,
Date.valueOf(localDate));
+ if (datetimeJava8ApiEnabled) {
+ addValueToRow(rowIndex, localDate);
+ } else {
+ addValueToRow(rowIndex,
Date.valueOf(localDate));
+ }
}
} else {
DateDayVector date = (DateDayVector)
curFieldVector;
@@ -378,7 +386,11 @@ public class RowBatch implements Serializable {
continue;
}
LocalDate localDate =
LocalDate.ofEpochDay(date.get(rowIndex));
- addValueToRow(rowIndex,
Date.valueOf(localDate));
+ if (datetimeJava8ApiEnabled) {
+ addValueToRow(rowIndex, localDate);
+ } else {
+ addValueToRow(rowIndex,
Date.valueOf(localDate));
+ }
}
}
break;
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala
index bb9008a..50f6474 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala
@@ -51,7 +51,9 @@ protected[spark] abstract class AbstractDorisRDD[T: ClassTag](
*/
@transient private[spark] lazy val dorisCfg =
DorisConfig.fromMap(sc.getConf.getAll.toMap.asJava, params.asJava, false)
- @transient private[spark] lazy val dorisPartitions =
ReaderPartitionGenerator.generatePartitions(dorisCfg)
+ @transient private[spark] lazy val dateTimeJava8ApiEnabled =
sc.getConf.get("spark.sql.datetime.java8API.enabled", "false").toBoolean
+
+ @transient private[spark] lazy val dorisPartitions =
ReaderPartitionGenerator.generatePartitions(dorisCfg, dateTimeJava8ApiEnabled)
}
private[spark] class DorisPartition(rddId: Int, idx: Int, val dorisPartition:
DorisReaderPartition)
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
index aefb013..dd8a7d5 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala
@@ -23,10 +23,12 @@ import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.SpecializedGetters
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, DateTimeUtils}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import java.sql.{Date, Timestamp}
+import java.time.LocalDate
import scala.collection.mutable
object RowConvertors {
@@ -105,10 +107,11 @@ object RowConvertors {
}
}
- def convertValue(v: Any, dataType: DataType): Any = {
+ def convertValue(v: Any, dataType: DataType, datetimeJava8ApiEnabled:
Boolean): Any = {
dataType match {
case StringType => UTF8String.fromString(v.asInstanceOf[String])
case TimestampType =>
DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf(v.asInstanceOf[String]))
+ case DateType if datetimeJava8ApiEnabled =>
v.asInstanceOf[LocalDate].toEpochDay.toInt
case DateType => DateTimeUtils.fromJavaDate(v.asInstanceOf[Date])
case _: MapType =>
val map = v.asInstanceOf[Map[String, String]]
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
index 134595f..7b0cca7 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/client/read/RowBatchTest.java
@@ -56,6 +56,8 @@ import org.apache.doris.sdk.thrift.TStatusCode;
import org.apache.doris.spark.exception.DorisException;
import org.apache.doris.spark.rest.RestService;
import org.apache.doris.spark.rest.models.Schema;
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.internal.SQLConf$;
import org.apache.spark.sql.types.Decimal;
import static org.hamcrest.core.StringStartsWith.startsWith;
import org.junit.Assert;
@@ -73,6 +75,7 @@ import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.charset.StandardCharsets;
import java.sql.Date;
+import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.Arrays;
@@ -261,7 +264,7 @@ public class RowBatchTest {
Schema schema = RestService.parseSchema(schemaStr, logger);
- RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+ RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
List<Object> expectedRow1 = Arrays.asList(
Boolean.TRUE,
@@ -375,7 +378,7 @@ public class RowBatchTest {
Schema schema = RestService.parseSchema(schemaStr, logger);
- RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+ RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow0 = rowBatch.next();
@@ -439,7 +442,7 @@ public class RowBatchTest {
Schema schema = RestService.parseSchema(schemaStr, logger);
- RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+ RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow0 = rowBatch.next();
@@ -527,7 +530,7 @@ public class RowBatchTest {
Schema schema = RestService.parseSchema(schemaStr, logger);
- RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+ RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow0 = rowBatch.next();
@@ -602,7 +605,7 @@ public class RowBatchTest {
Schema schema = RestService.parseSchema(schemaStr, logger);
- RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+ RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow0 = rowBatch.next();
@@ -683,7 +686,7 @@ public class RowBatchTest {
Schema schema = RestService.parseSchema(schemaStr, logger);
- RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+ RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
Assert.assertTrue(rowBatch.hasNext());
Assert.assertEquals(JavaConverters.mapAsScalaMapConverter(ImmutableMap.of("k1",
"0")).asScala(),
rowBatch.next().get(0));
@@ -749,7 +752,7 @@ public class RowBatchTest {
Schema schema = RestService.parseSchema(schemaStr, logger);
- RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+ RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
Assert.assertTrue(rowBatch.hasNext());
Assert.assertEquals("{\"a\":\"a1\",\"b\":1}", rowBatch.next().get(0));
@@ -827,7 +830,7 @@ public class RowBatchTest {
Schema schema = RestService.parseSchema(schemaStr, logger);
- RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+ RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow0 = rowBatch.next();
@@ -902,7 +905,7 @@ public class RowBatchTest {
Schema schema = RestService.parseSchema(schemaStr, logger);
- RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+ RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow0 = rowBatch.next();
@@ -995,7 +998,7 @@ public class RowBatchTest {
Schema schema = RestService.parseSchema(schemaStr, logger);
- RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+ RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow0 = rowBatch.next();
assertEquals("0.0.0.0", actualRow0.get(0));
@@ -1104,7 +1107,7 @@ public class RowBatchTest {
Schema schema = RestService.parseSchema(schemaStr, logger);
- RowBatch rowBatch = new RowBatch(scanBatchResult, schema);
+ RowBatch rowBatch = new RowBatch(scanBatchResult, schema, false);
Assert.assertTrue(rowBatch.hasNext());
List<Object> actualRow0 = rowBatch.next();
assertEquals("::", actualRow0.get(0));
@@ -1163,4 +1166,83 @@ public class RowBatchTest {
rowBatch.next();
}
+ @Test
+ public void testDatetimeJava8API() throws DorisException, IOException {
+
+ ImmutableList.Builder<Field> childrenBuilder = ImmutableList.builder();
+ childrenBuilder.add(new Field("k0", FieldType.nullable(new
ArrowType.Utf8()), null));
+ childrenBuilder.add(new Field("k1", FieldType.nullable(new
ArrowType.Date(DateUnit.DAY)), null));
+
+ VectorSchemaRoot root = VectorSchemaRoot.create(
+ new
org.apache.arrow.vector.types.pojo.Schema(childrenBuilder.build(), null),
+ new RootAllocator(Integer.MAX_VALUE));
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ ArrowStreamWriter arrowStreamWriter = new ArrowStreamWriter(
+ root,
+ new DictionaryProvider.MapDictionaryProvider(),
+ outputStream);
+
+ arrowStreamWriter.start();
+ root.setRowCount(1);
+
+ FieldVector vector = root.getVector("k0");
+ VarCharVector dateVector = (VarCharVector) vector;
+ dateVector.setInitialCapacity(1);
+ dateVector.allocateNew();
+ dateVector.setIndexDefined(0);
+ dateVector.setValueLengthSafe(0, 20);
+ dateVector.setSafe(0, "2025-01-01".getBytes());
+ vector.setValueCount(1);
+
+ LocalDate localDate = LocalDate.of(2025, 2, 1);
+ long date = localDate.toEpochDay();
+
+ vector = root.getVector("k1");
+ DateDayVector date2Vector = (DateDayVector) vector;
+ date2Vector.setInitialCapacity(1);
+ date2Vector.allocateNew();
+ date2Vector.setIndexDefined(0);
+ date2Vector.setSafe(0, (int) date);
+ vector.setValueCount(1);
+
+ arrowStreamWriter.writeBatch();
+
+ arrowStreamWriter.end();
+ arrowStreamWriter.close();
+
+ TStatus status = new TStatus();
+ status.setStatusCode(TStatusCode.OK);
+ TScanBatchResult scanBatchResult = new TScanBatchResult();
+ scanBatchResult.setStatus(status);
+ scanBatchResult.setEos(false);
+ scanBatchResult.setRows(outputStream.toByteArray());
+
+
+ String schemaStr = "{\"properties\":[" +
+ "{\"type\":\"DATE\",\"name\":\"k0\",\"comment\":\"\"}, " +
+ "{\"type\":\"DATEV2\",\"name\":\"k1\",\"comment\":\"\"}" +
+ "], \"status\":200}";
+
+ Schema schema = RestService.parseSchema(schemaStr, logger);
+
+ RowBatch rowBatch1 = new RowBatch(scanBatchResult, schema, false);
+
+ Assert.assertTrue(rowBatch1.hasNext());
+ List<Object> actualRow0 = rowBatch1.next();
+ Assert.assertEquals(Date.valueOf("2025-01-01"), actualRow0.get(0));
+ Assert.assertEquals(Date.valueOf("2025-02-01"), actualRow0.get(1));
+
+ Assert.assertFalse(rowBatch1.hasNext());
+
+ RowBatch rowBatch2 = new RowBatch(scanBatchResult, schema, true);
+
+ Assert.assertTrue(rowBatch2.hasNext());
+ List<Object> actualRow01 = rowBatch2.next();
+ Assert.assertEquals(LocalDate.of(2025,1,1), actualRow01.get(0));
+ Assert.assertEquals(localDate, actualRow01.get(1));
+
+ Assert.assertFalse(rowBatch2.hasNext());
+
+ }
+
}
\ No newline at end of file
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala
b/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala
index eaa0ad0..ce36c06 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/test/scala/org/apache/doris/spark/util/RowConvertorsTest.scala
@@ -25,6 +25,7 @@ import org.junit.Assert
import org.junit.jupiter.api.Test
import java.sql.{Date, Timestamp}
+import java.time.LocalDate
class RowConvertorsTest {
@@ -103,16 +104,17 @@ class RowConvertorsTest {
@Test def convertValue(): Unit = {
- Assert.assertTrue(RowConvertors.convertValue(1,
DataTypes.IntegerType).isInstanceOf[Int])
- Assert.assertTrue(RowConvertors.convertValue(2.3.toFloat,
DataTypes.FloatType).isInstanceOf[Float])
- Assert.assertTrue(RowConvertors.convertValue(4.5,
DataTypes.DoubleType).isInstanceOf[Double])
- Assert.assertTrue(RowConvertors.convertValue(6.toShort,
DataTypes.ShortType).isInstanceOf[Short])
- Assert.assertTrue(RowConvertors.convertValue(7L,
DataTypes.LongType).isInstanceOf[Long])
- Assert.assertTrue(RowConvertors.convertValue(Decimal(BigDecimal(8910.11),
20, 4), DecimalType(20, 4)).isInstanceOf[Decimal])
- Assert.assertTrue(RowConvertors.convertValue("2024-01-01",
DataTypes.DateType).isInstanceOf[Int])
- Assert.assertTrue(RowConvertors.convertValue("2024-01-01 12:34:56",
DataTypes.TimestampType).isInstanceOf[Long])
- Assert.assertTrue(RowConvertors.convertValue(Map[String, String]("a" ->
"1"), MapType(DataTypes.StringType,
DataTypes.StringType)).isInstanceOf[MapData])
- Assert.assertTrue(RowConvertors.convertValue("test",
DataTypes.StringType).isInstanceOf[UTF8String])
+ Assert.assertTrue(RowConvertors.convertValue(1, DataTypes.IntegerType,
false).isInstanceOf[Int])
+ Assert.assertTrue(RowConvertors.convertValue(2.3.toFloat,
DataTypes.FloatType, false).isInstanceOf[Float])
+ Assert.assertTrue(RowConvertors.convertValue(4.5, DataTypes.DoubleType,
false).isInstanceOf[Double])
+ Assert.assertTrue(RowConvertors.convertValue(6.toShort,
DataTypes.ShortType, false).isInstanceOf[Short])
+ Assert.assertTrue(RowConvertors.convertValue(7L, DataTypes.LongType,
false).isInstanceOf[Long])
+ Assert.assertTrue(RowConvertors.convertValue(Decimal(BigDecimal(8910.11),
20, 4), DecimalType(20, 4), false).isInstanceOf[Decimal])
+ Assert.assertTrue(RowConvertors.convertValue(Date.valueOf("2024-01-01"),
DataTypes.DateType, false).isInstanceOf[Int])
+ Assert.assertTrue(RowConvertors.convertValue(LocalDate.now(),
DataTypes.DateType, true).isInstanceOf[Int])
+ Assert.assertTrue(RowConvertors.convertValue("2024-01-01 12:34:56",
DataTypes.TimestampType, false).isInstanceOf[Long])
+ Assert.assertTrue(RowConvertors.convertValue(Map[String, String]("a" ->
"1"), MapType(DataTypes.StringType, DataTypes.StringType),
false).isInstanceOf[MapData])
+ Assert.assertTrue(RowConvertors.convertValue("test", DataTypes.StringType,
false).isInstanceOf[UTF8String])
}
diff --git a/spark-doris-connector/spark-doris-connector-spark-2/pom.xml
b/spark-doris-connector/spark-doris-connector-spark-2/pom.xml
index 254db30..b402084 100644
--- a/spark-doris-connector/spark-doris-connector-spark-2/pom.xml
+++ b/spark-doris-connector/spark-doris-connector-spark-2/pom.xml
@@ -35,10 +35,6 @@
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <spark.version>2.4.8</spark.version>
- <spark.major.version>2.4</spark.major.version>
- <scala.version>2.11.12</scala.version>
- <scala.major.version>2.11</scala.major.version>
</properties>
<dependencies>
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/AbstractDorisScan.scala
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/AbstractDorisScan.scala
index 34bff24..4afcc01 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/AbstractDorisScan.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/AbstractDorisScan.scala
@@ -22,6 +22,7 @@ import
org.apache.doris.spark.client.read.ReaderPartitionGenerator
import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.connector.read.{Batch, InputPartition,
PartitionReaderFactory, Scan}
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import scala.language.implicitConversions
@@ -35,7 +36,9 @@ abstract class AbstractDorisScan(config: DorisConfig, schema:
StructType) extend
override def toBatch: Batch = this
override def planInputPartitions(): Array[InputPartition] = {
- ReaderPartitionGenerator.generatePartitions(config, schema.names,
compiledFilters(), getLimit).map(toInputPartition)
+ ReaderPartitionGenerator.generatePartitions(config, schema.names,
compiledFilters(), getLimit,
+ SQLConf.get.datetimeJava8ApiEnabled)
+ .map(toInputPartition)
}
@@ -44,7 +47,8 @@ abstract class AbstractDorisScan(config: DorisConfig, schema:
StructType) extend
}
private def toInputPartition(rp: DorisReaderPartition): DorisInputPartition =
- DorisInputPartition(rp.getDatabase, rp.getTable, rp.getBackend,
rp.getTablets.map(_.toLong), rp.getOpaquedQueryPlan, rp.getReadColumns,
rp.getFilters, rp.getLimit)
+ DorisInputPartition(rp.getDatabase, rp.getTable, rp.getBackend,
rp.getTablets.map(_.toLong), rp.getOpaquedQueryPlan,
+ rp.getReadColumns, rp.getFilters, rp.getLimit,
rp.getDateTimeJava8APIEnabled)
protected def compiledFilters(): Array[String]
@@ -52,4 +56,6 @@ abstract class AbstractDorisScan(config: DorisConfig, schema:
StructType) extend
}
-case class DorisInputPartition(database: String, table: String, backend:
Backend, tablets: Array[Long], opaquedQueryPlan: String, readCols:
Array[String], predicates: Array[String], limit: Int = -1) extends
InputPartition
+case class DorisInputPartition(database: String, table: String, backend:
Backend, tablets: Array[Long],
+ opaquedQueryPlan: String, readCols:
Array[String], predicates: Array[String],
+ limit: Int = -1, datetimeJava8ApiEnabled:
Boolean) extends InputPartition
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisPartitionReader.scala
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisPartitionReader.scala
index 42be1c5..0061fff 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisPartitionReader.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/read/DorisPartitionReader.scala
@@ -22,7 +22,7 @@ import
org.apache.doris.spark.client.read.{DorisFlightSqlReader, DorisReader, Do
import org.apache.doris.spark.config.DorisConfig
import org.apache.doris.spark.util.RowConvertors
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.expressions.SpecificInternalRow
+import org.apache.spark.sql.catalyst.expressions.GenericInternalRow
import org.apache.spark.sql.connector.read.{InputPartition, PartitionReader}
import org.apache.spark.sql.types.StructType
@@ -34,7 +34,7 @@ class DorisPartitionReader(inputPartition: InputPartition,
schema: StructType, m
private implicit def toReaderPartition(inputPart: DorisInputPartition):
DorisReaderPartition = {
val tablets = inputPart.tablets.map(java.lang.Long.valueOf)
new DorisReaderPartition(inputPart.database, inputPart.table,
inputPart.backend, tablets,
- inputPart.opaquedQueryPlan, inputPart.readCols, inputPart.predicates,
inputPart.limit, config)
+ inputPart.opaquedQueryPlan, inputPart.readCols, inputPart.predicates,
inputPart.limit, config, inputPart.datetimeJava8ApiEnabled)
}
private lazy val reader: DorisReader = {
@@ -45,19 +45,21 @@ class DorisPartitionReader(inputPartition: InputPartition,
schema: StructType, m
}
}
+ private val datetimeJava8ApiEnabled: Boolean =
inputPartition.asInstanceOf[DorisInputPartition].datetimeJava8ApiEnabled
+
override def next(): Boolean = reader.hasNext
override def get(): InternalRow = {
val values = reader.next().asInstanceOf[Array[Any]]
+ val row = new GenericInternalRow(schema.length)
if (values.nonEmpty) {
- val row = new SpecificInternalRow(schema.fields.map(_.dataType))
values.zipWithIndex.foreach {
case (value, index) =>
if (value == null) row.setNullAt(index)
- else row.update(index, RowConvertors.convertValue(value,
schema.fields(index).dataType))
+ else row.update(index, RowConvertors.convertValue(value,
schema.fields(index).dataType, datetimeJava8ApiEnabled))
}
- row
- } else null.asInstanceOf[InternalRow]
+ }
+ row
}
override def close(): Unit = {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]