This is an automated email from the ASF dual-hosted git repository.
cancai pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/calcite.git
The following commit(s) were added to refs/heads/main by this push:
new 9350097148 [CALCITE-6646] Support timestamp data type in Arrow adapter
9350097148 is described below
commit 93500971487c29ac2f34b7d84105f46015cb9e51
Author: Cancai Cai <[email protected]>
AuthorDate: Mon Mar 23 23:35:51 2026 +0800
[CALCITE-6646] Support timestamp data type in Arrow adapter
---
.../adapter/arrow/AbstractArrowEnumerator.java | 45 +++++++++-
.../adapter/arrow/ArrowFieldTypeFactory.java | 21 +++++
.../adapter/arrow/ArrowAdapterDataTypesTest.java | 96 +++++++++++++++++++---
.../calcite/adapter/arrow/ArrowAdapterTest.java | 18 ++--
.../calcite/adapter/arrow/ArrowDataTest.java | 82 ++++++++++++++++++
5 files changed, 240 insertions(+), 22 deletions(-)
diff --git
a/arrow/src/main/java/org/apache/calcite/adapter/arrow/AbstractArrowEnumerator.java
b/arrow/src/main/java/org/apache/calcite/adapter/arrow/AbstractArrowEnumerator.java
index 6351dca36f..8cc08b9904 100644
---
a/arrow/src/main/java/org/apache/calcite/adapter/arrow/AbstractArrowEnumerator.java
+++
b/arrow/src/main/java/org/apache/calcite/adapter/arrow/AbstractArrowEnumerator.java
@@ -20,11 +20,14 @@
import org.apache.calcite.util.ImmutableIntList;
import org.apache.calcite.util.Util;
+import org.apache.arrow.vector.TimeStampVector;
import org.apache.arrow.vector.ValueVector;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.VectorUnloader;
import org.apache.arrow.vector.ipc.ArrowFileReader;
import org.apache.arrow.vector.ipc.message.ArrowRecordBatch;
+import org.apache.arrow.vector.types.TimeUnit;
+import org.apache.arrow.vector.types.pojo.ArrowType;
import java.io.IOException;
import java.util.ArrayList;
@@ -66,16 +69,54 @@ protected void loadNextArrowBatch() {
@Override public Object current() {
if (fields.size() == 1) {
- return this.valueVectors.get(0).getObject(currRowIndex);
+ return getValue(this.valueVectors.get(0), currRowIndex);
}
Object[] current = new Object[valueVectors.size()];
for (int i = 0; i < valueVectors.size(); i++) {
ValueVector vector = this.valueVectors.get(i);
- current[i] = vector.getObject(currRowIndex);
+ current[i] = getValue(vector, currRowIndex);
}
return current;
}
+ /** Extracts a value from a vector at the given index.
+ *
+ * <p>For {@link TimeStampVector}, converts the raw value to
+ * milliseconds since epoch, which is the representation used by
+ * Calcite's Enumerable runtime for TIMESTAMP types. */
+ private static Object getValue(ValueVector vector, int index) {
+ if (vector instanceof TimeStampVector) {
+ if (vector.isNull(index)) {
+ return null;
+ }
+ final TimeStampVector tsVector = (TimeStampVector) vector;
+ final long rawValue = tsVector.get(index);
+ final ArrowType.Timestamp tsType =
+ (ArrowType.Timestamp) vector.getField().getType();
+ return toMillis(rawValue, tsType.getUnit());
+ }
+ return vector.getObject(index);
+ }
+
+ /** Converts a raw timestamp value to milliseconds since epoch.
+ *
+ * <p>Note: for {@link TimeUnit#MICROSECOND} and {@link TimeUnit#NANOSECOND},
+ * this conversion is lossy because sub-millisecond precision is truncated.
*/
+ private static long toMillis(long rawValue, TimeUnit unit) {
+ switch (unit) {
+ case SECOND:
+ return rawValue * 1000L;
+ case MILLISECOND:
+ return rawValue;
+ case MICROSECOND:
+ return rawValue / 1000L;
+ case NANOSECOND:
+ return rawValue / 1_000_000L;
+ default:
+ throw new IllegalArgumentException("Unsupported TimeUnit: " + unit);
+ }
+ }
+
@Override public void reset() {
throw new UnsupportedOperationException();
}
diff --git
a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFieldTypeFactory.java
b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFieldTypeFactory.java
index ad993c813d..1693637e8c 100644
---
a/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFieldTypeFactory.java
+++
b/arrow/src/main/java/org/apache/calcite/adapter/arrow/ArrowFieldTypeFactory.java
@@ -82,6 +82,27 @@ private static RelDataType of(ArrowType arrowType,
JavaTypeFactory typeFactory)
((ArrowType.Decimal) arrowType).getScale());
case Time:
return typeFactory.createSqlType(SqlTypeName.TIME);
+ case Timestamp:
+ ArrowType.Timestamp timestampType = (ArrowType.Timestamp) arrowType;
+ int timestampPrecision;
+ switch (timestampType.getUnit()) {
+ case SECOND:
+ timestampPrecision = 0;
+ break;
+ case MILLISECOND:
+ timestampPrecision = 3;
+ break;
+ case MICROSECOND:
+ timestampPrecision = 6;
+ break;
+ case NANOSECOND:
+ timestampPrecision = 9;
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported Timestamp unit: "
+ + timestampType.getUnit());
+ }
+ return typeFactory.createSqlType(SqlTypeName.TIMESTAMP,
timestampPrecision);
default:
throw new IllegalArgumentException("Unsupported type: " + arrowType);
}
diff --git
a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterDataTypesTest.java
b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterDataTypesTest.java
index 46e4404937..566b4c531e 100644
---
a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterDataTypesTest.java
+++
b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterDataTypesTest.java
@@ -68,7 +68,7 @@ static void initializeArrowState(@TempDir Path sharedTempDir)
String sql = "select \"tinyIntField\" from arrowdatatype";
String plan = "PLAN=ArrowToEnumerableConverter\n"
+ " ArrowProject(tinyIntField=[$0])\n"
- + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11]])\n\n";
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
String result = "tinyIntField=0\ntinyIntField=1\n";
CalciteAssert.that()
.with(arrow)
@@ -82,7 +82,7 @@ static void initializeArrowState(@TempDir Path sharedTempDir)
String sql = "select \"smallIntField\" from arrowdatatype";
String plan = "PLAN=ArrowToEnumerableConverter\n"
+ " ArrowProject(smallIntField=[$1])\n"
- + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11]])\n\n";
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
String result = "smallIntField=0\nsmallIntField=1\n";
CalciteAssert.that()
.with(arrow)
@@ -96,7 +96,7 @@ static void initializeArrowState(@TempDir Path sharedTempDir)
String sql = "select \"intField\" from arrowdatatype";
String plan = "PLAN=ArrowToEnumerableConverter\n"
+ " ArrowProject(intField=[$2])\n"
- + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11]])\n\n";
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
String result = "intField=0\nintField=1\n";
CalciteAssert.that()
.with(arrow)
@@ -110,7 +110,7 @@ static void initializeArrowState(@TempDir Path
sharedTempDir)
String sql = "select \"longField\" from arrowdatatype";
String plan = "PLAN=ArrowToEnumerableConverter\n"
+ " ArrowProject(longField=[$5])\n"
- + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11]])\n\n";
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
String result = "longField=0\nlongField=1\n";
CalciteAssert.that()
.with(arrow)
@@ -124,7 +124,7 @@ static void initializeArrowState(@TempDir Path
sharedTempDir)
String sql = "select \"floatField\" from arrowdatatype";
String plan = "PLAN=ArrowToEnumerableConverter\n"
+ " ArrowProject(floatField=[$4])\n"
- + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11]])\n\n";
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
String result = "floatField=0.0\nfloatField=1.0\n";
CalciteAssert.that()
.with(arrow)
@@ -138,7 +138,7 @@ static void initializeArrowState(@TempDir Path
sharedTempDir)
String sql = "select \"doubleField\" from arrowdatatype";
String plan = "PLAN=ArrowToEnumerableConverter\n"
+ " ArrowProject(doubleField=[$6])\n"
- + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11]])\n\n";
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
String result = "doubleField=0.0\ndoubleField=1.0\n";
CalciteAssert.that()
.with(arrow)
@@ -152,7 +152,7 @@ static void initializeArrowState(@TempDir Path
sharedTempDir)
String sql = "select \"decimalField\" from arrowdatatype";
String plan = "PLAN=ArrowToEnumerableConverter\n"
+ " ArrowProject(decimalField=[$8])\n"
- + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11]])\n\n";
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
String result = "decimalField=0.00\ndecimalField=1.00\n";
CalciteAssert.that()
.with(arrow)
@@ -166,7 +166,7 @@ static void initializeArrowState(@TempDir Path
sharedTempDir)
String sql = "select \"dateField\" from arrowdatatype";
String plan = "PLAN=ArrowToEnumerableConverter\n"
+ " ArrowProject(dateField=[$9])\n"
- + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11]])\n\n";
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
String result = "dateField=1970-01-01\n"
+ "dateField=1970-01-02\n";
CalciteAssert.that()
@@ -181,7 +181,7 @@ static void initializeArrowState(@TempDir Path
sharedTempDir)
String sql = "select \"booleanField\" from arrowdatatype";
String plan = "PLAN=ArrowToEnumerableConverter\n"
+ " ArrowProject(booleanField=[$7])\n"
- + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11]])\n\n";
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
String result =
"booleanField=null\nbooleanField=true\nbooleanField=false\n";
CalciteAssert.that()
.with(arrow)
@@ -198,7 +198,7 @@ static void initializeArrowState(@TempDir Path
sharedTempDir)
String sql = "select \"decimalField2\" from arrowdatatype";
String plan = "PLAN=ArrowToEnumerableConverter\n"
+ " ArrowProject(decimalField2=[$10])\n"
- + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11]])\n\n";
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
String result = "decimalField2=20.000\ndecimalField2=21.000\n";
CalciteAssert.that()
.with(arrow)
@@ -212,7 +212,7 @@ static void initializeArrowState(@TempDir Path
sharedTempDir)
String sql = "select \"timeField\" from arrowdatatype";
String plan = "PLAN=ArrowToEnumerableConverter\n"
+ " ArrowProject(timeField=[$11])\n"
- + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11]])\n\n";
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
String result = "timeField=00:00:00\n"
+ "timeField=00:00:01\n";
CalciteAssert.that()
@@ -222,4 +222,78 @@ static void initializeArrowState(@TempDir Path
sharedTempDir)
.returns(result)
.explainContains(plan);
}
+
+ @Test void testTimestampSecProject() {
+ String sql = "select \"timestampSecField\" from arrowdatatype";
+ String plan = "PLAN=ArrowToEnumerableConverter\n"
+ + " ArrowProject(timestampSecField=[$12])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
+ String result = "timestampSecField=2024-01-01 00:00:00\n"
+ + "timestampSecField=2024-01-02 00:00:00\n";
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .limit(2)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ @Test void testTimestampMilliProject() {
+ String sql = "select \"timestampMilliField\" from arrowdatatype";
+ String plan = "PLAN=ArrowToEnumerableConverter\n"
+ + " ArrowProject(timestampMilliField=[$13])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
+ String result = "timestampMilliField=2024-01-01 00:00:00.000\n"
+ + "timestampMilliField=2024-01-02 00:00:00.000\n";
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .limit(2)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ /** Test case for
+ * <a
href="https://issues.apache.org/jira/browse/CALCITE-6646">[CALCITE-6646]
+ * Support timestamp data type in Arrow adapter</a>.
+ *
+ * <p>The source data is {@code 2024-01-01 00:00:00.123456} in microseconds.
+ * The Enumerable runtime only supports millisecond precision, so the result
+ * is truncated to {@code 2024-01-01 00:00:00.123}. */
+ @Test void testTimestampMicroProject() {
+ String sql = "select \"timestampMicroField\" from arrowdatatype";
+ String plan = "PLAN=ArrowToEnumerableConverter\n"
+ + " ArrowProject(timestampMicroField=[$14])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
+ String result = "timestampMicroField=2024-01-01 00:00:00.123\n"
+ + "timestampMicroField=2024-01-02 00:00:00.123\n";
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .limit(2)
+ .returns(result)
+ .explainContains(plan);
+ }
+
+ /** Test case for
+ * <a
href="https://issues.apache.org/jira/browse/CALCITE-6646">[CALCITE-6646]
+ * Arrow adapter should support Timestamp data type</a>.
+ *
+ * <p>The source data is {@code 2024-01-01 00:00:00.123456789} in
nanoseconds.
+ * The Enumerable runtime only supports millisecond precision, so the result
+ * is truncated to {@code 2024-01-01 00:00:00.123}. */
+ @Test void testTimestampNanoProject() {
+ String sql = "select \"timestampNanoField\" from arrowdatatype";
+ String plan = "PLAN=ArrowToEnumerableConverter\n"
+ + " ArrowProject(timestampNanoField=[$15])\n"
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
+ String result = "timestampNanoField=2024-01-01 00:00:00.123\n"
+ + "timestampNanoField=2024-01-02 00:00:00.123\n";
+ CalciteAssert.that()
+ .with(arrow)
+ .query(sql)
+ .limit(2)
+ .returns(result)
+ .explainContains(plan);
+ }
}
diff --git
a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java
b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java
index 6027e24488..14f387509c 100644
--- a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java
+++ b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowAdapterTest.java
@@ -860,7 +860,7 @@ static void initializeArrowState(@TempDir Path
sharedTempDir)
String plan = "PLAN=ArrowToEnumerableConverter\n"
+ " ArrowProject(booleanField=[$7])\n"
+ " ArrowFilter(condition=[$7])\n"
- + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11]])\n\n";
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
String result = "booleanField=true\nbooleanField=true\n";
CalciteAssert.that()
@@ -878,7 +878,7 @@ static void initializeArrowState(@TempDir Path
sharedTempDir)
String plan = "PLAN=ArrowToEnumerableConverter\n"
+ " ArrowProject(intField=[$2])\n"
+ " ArrowFilter(condition=[>($2, 10)])\n"
- + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11]])\n\n";
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
String result = "intField=11\nintField=12\n";
CalciteAssert.that()
@@ -896,7 +896,7 @@ static void initializeArrowState(@TempDir Path
sharedTempDir)
String plan = "PLAN=ArrowToEnumerableConverter\n"
+ " ArrowProject(booleanField=[$7])\n"
+ " ArrowFilter(condition=[NOT($7)])\n"
- + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11]])\n\n";
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
String result = "booleanField=false\nbooleanField=false\n";
CalciteAssert.that()
@@ -915,7 +915,7 @@ static void initializeArrowState(@TempDir Path
sharedTempDir)
String plan = "PLAN=ArrowToEnumerableConverter\n"
+ " ArrowProject(booleanField=[$7])\n"
+ " ArrowFilter(condition=[IS NOT TRUE($7)])\n"
- + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11]])\n\n";
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
String result = "booleanField=null\nbooleanField=false\n";
CalciteAssert.that()
@@ -933,7 +933,7 @@ static void initializeArrowState(@TempDir Path
sharedTempDir)
String plan = "PLAN=ArrowToEnumerableConverter\n"
+ " ArrowProject(booleanField=[$7])\n"
+ " ArrowFilter(condition=[IS NOT FALSE($7)])\n"
- + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11]])\n\n";
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
String result = "booleanField=null\nbooleanField=true\n";
CalciteAssert.that()
@@ -951,7 +951,7 @@ static void initializeArrowState(@TempDir Path
sharedTempDir)
String plan = "PLAN=ArrowToEnumerableConverter\n"
+ " ArrowProject(booleanField=[$7])\n"
+ " ArrowFilter(condition=[IS NULL($7)])\n"
- + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11]])\n\n";
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
String result = "booleanField=null\n";
CalciteAssert.that()
@@ -972,7 +972,7 @@ static void initializeArrowState(@TempDir Path
sharedTempDir)
String plan = "PLAN=ArrowToEnumerableConverter\n"
+ " ArrowProject(decimalField=[$8])\n"
+ " ArrowFilter(condition=[=($8, 1.00)])\n"
- + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11]])\n\n";
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
String result = "decimalField=1.00\n";
CalciteAssert.that()
@@ -989,7 +989,7 @@ static void initializeArrowState(@TempDir Path
sharedTempDir)
String plan = "PLAN=ArrowToEnumerableConverter\n"
+ " ArrowProject(doubleField=[$6])\n"
+ " ArrowFilter(condition=[=($6, 1.0E0)])\n"
- + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11]])\n\n";
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
String result = "doubleField=1.0\n";
CalciteAssert.that()
@@ -1006,7 +1006,7 @@ static void initializeArrowState(@TempDir Path
sharedTempDir)
String plan = "PLAN=ArrowToEnumerableConverter\n"
+ " ArrowProject(stringField=[$3])\n"
+ " ArrowFilter(condition=[=($3, '1')])\n"
- + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11]])\n\n";
+ + " ArrowTableScan(table=[[ARROW, ARROWDATATYPE]], fields=[[0, 1,
2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15]])\n\n";
String result = "stringField=1\n";
CalciteAssert.that()
diff --git
a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowDataTest.java
b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowDataTest.java
index 0d92da040e..8241a80dd1 100644
--- a/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowDataTest.java
+++ b/arrow/src/test/java/org/apache/calcite/adapter/arrow/ArrowDataTest.java
@@ -16,6 +16,8 @@
*/
package org.apache.calcite.adapter.arrow;
+import org.apache.calcite.avatica.util.DateTimeUtils;
+
import org.apache.arrow.adapter.jdbc.ArrowVectorIterator;
import org.apache.arrow.adapter.jdbc.JdbcToArrow;
import org.apache.arrow.adapter.jdbc.JdbcToArrowConfig;
@@ -32,6 +34,10 @@
import org.apache.arrow.vector.IntVector;
import org.apache.arrow.vector.SmallIntVector;
import org.apache.arrow.vector.TimeSecVector;
+import org.apache.arrow.vector.TimeStampMicroVector;
+import org.apache.arrow.vector.TimeStampMilliVector;
+import org.apache.arrow.vector.TimeStampNanoVector;
+import org.apache.arrow.vector.TimeStampSecVector;
import org.apache.arrow.vector.TinyIntVector;
import org.apache.arrow.vector.VarCharVector;
import org.apache.arrow.vector.VectorSchemaRoot;
@@ -67,6 +73,10 @@
*/
public class ArrowDataTest {
+ /** 2024-01-01 00:00:00 UTC, in epoch milliseconds. */
+ private static final long BASE_EPOCH_MILLIS =
+ DateTimeUtils.unixTimestamp(2024, 1, 1, 0, 0, 0);
+
private final int batchSize;
private final int entries;
private byte tinyIntValue;
@@ -111,6 +121,14 @@ private Schema makeArrowDateTypeSchema() {
FieldType decimalType2 = FieldType.nullable(new ArrowType.Decimal(12, 3,
128));
FieldType dateType = FieldType.nullable(new ArrowType.Date(DateUnit.DAY));
FieldType timeType = FieldType.nullable(new
ArrowType.Time(TimeUnit.SECOND, 32));
+ FieldType timestampSecType =
+ FieldType.nullable(new ArrowType.Timestamp(TimeUnit.SECOND, null));
+ FieldType timestampMilliType =
+ FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MILLISECOND,
null));
+ FieldType timestampMicroType =
+ FieldType.nullable(new ArrowType.Timestamp(TimeUnit.MICROSECOND,
null));
+ FieldType timestampNanoType =
+ FieldType.nullable(new ArrowType.Timestamp(TimeUnit.NANOSECOND, null));
childrenBuilder.add(new Field("tinyIntField", tinyIntType, null));
childrenBuilder.add(new Field("smallIntField", smallIntType, null));
@@ -124,6 +142,10 @@ private Schema makeArrowDateTypeSchema() {
childrenBuilder.add(new Field("dateField", dateType, null));
childrenBuilder.add(new Field("decimalField2", decimalType2, null));
childrenBuilder.add(new Field("timeField", timeType, null));
+ childrenBuilder.add(new Field("timestampSecField", timestampSecType,
null));
+ childrenBuilder.add(new Field("timestampMilliField", timestampMilliType,
null));
+ childrenBuilder.add(new Field("timestampMicroField", timestampMicroType,
null));
+ childrenBuilder.add(new Field("timestampNanoField", timestampNanoType,
null));
return new Schema(childrenBuilder.build(), null);
}
@@ -282,6 +304,18 @@ public void writeArrowDataType(File file) throws
IOException {
case "timeField":
timeField(vector, numRows);
break;
+ case "timestampSecField":
+ timestampSecField(vector, numRows);
+ break;
+ case "timestampMilliField":
+ timestampMilliField(vector, numRows);
+ break;
+ case "timestampMicroField":
+ timestampMicroField(vector, numRows);
+ break;
+ case "timestampNanoField":
+ timestampNanoField(vector, numRows);
+ break;
default:
throw new IllegalStateException("Not supported type yet: " +
vector.getMinorType());
}
@@ -430,4 +464,52 @@ private void timeField(FieldVector fieldVector, int
rowCount) {
}
fieldVector.setValueCount(rowCount);
}
+
+ private void timestampSecField(FieldVector fieldVector, int rowCount) {
+ TimeStampSecVector tsVector = (TimeStampSecVector) fieldVector;
+ tsVector.setInitialCapacity(rowCount);
+ tsVector.allocateNew();
+ for (int i = 0; i < rowCount; i++) {
+ tsVector.set(i,
+ BASE_EPOCH_MILLIS / DateTimeUtils.MILLIS_PER_SECOND
+ + i * DateTimeUtils.SECONDS_PER_DAY);
+ }
+ fieldVector.setValueCount(rowCount);
+ }
+
+ private void timestampMilliField(FieldVector fieldVector, int rowCount) {
+ TimeStampMilliVector tsVector = (TimeStampMilliVector) fieldVector;
+ tsVector.setInitialCapacity(rowCount);
+ tsVector.allocateNew();
+ for (int i = 0; i < rowCount; i++) {
+ tsVector.set(i, BASE_EPOCH_MILLIS + i * DateTimeUtils.MILLIS_PER_DAY);
+ }
+ fieldVector.setValueCount(rowCount);
+ }
+
+ private void timestampMicroField(FieldVector fieldVector, int rowCount) {
+ // Sub-millisecond part (.000456) will be truncated by the adapter.
+ TimeStampMicroVector tsVector = (TimeStampMicroVector) fieldVector;
+ tsVector.setInitialCapacity(rowCount);
+ tsVector.allocateNew();
+ for (int i = 0; i < rowCount; i++) {
+ tsVector.set(i,
+ BASE_EPOCH_MILLIS * 1000L + 123456L
+ + i * DateTimeUtils.MILLIS_PER_DAY * 1000L);
+ }
+ fieldVector.setValueCount(rowCount);
+ }
+
+ private void timestampNanoField(FieldVector fieldVector, int rowCount) {
+ // Sub-millisecond part (.000456789) will be truncated by the adapter.
+ TimeStampNanoVector tsVector = (TimeStampNanoVector) fieldVector;
+ tsVector.setInitialCapacity(rowCount);
+ tsVector.allocateNew();
+ for (int i = 0; i < rowCount; i++) {
+ tsVector.set(i,
+ BASE_EPOCH_MILLIS * DateTimeUtils.NANOS_PER_MILLI + 123456789L
+ + i * DateTimeUtils.MILLIS_PER_DAY *
DateTimeUtils.NANOS_PER_MILLI);
+ }
+ fieldVector.setValueCount(rowCount);
+ }
}