This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 88332deba5 [python] Avro schema inconsistents between Java and Python
(#7077)
88332deba5 is described below
commit 88332deba57d22506f34fa29d35386583c43e9be
Author: ChengHui Chen <[email protected]>
AuthorDate: Thu Jan 22 17:45:58 2026 +0800
[python] Avro schema inconsistents between Java and Python (#7077)
---
.../test/java/org/apache/paimon/JavaPyE2ETest.java | 267 +++++++++++----------
paimon-python/dev/requirements-dev.txt | 3 +-
paimon-python/dev/run_mixed_tests.sh | 34 +--
paimon-python/pypaimon/schema/data_types.py | 57 +++--
.../pypaimon/tests/e2e/java_py_read_write_test.py | 90 ++++---
5 files changed, 261 insertions(+), 190 deletions(-)
diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index 39c2011bf4..ef9ba888fc 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -106,118 +106,130 @@ public class JavaPyE2ETest {
@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
public void testJavaWriteReadAppendTable() throws Exception {
- Identifier identifier = identifier("mixed_test_append_tablej");
- Schema schema =
- Schema.newBuilder()
- .column("id", DataTypes.INT())
- .column("name", DataTypes.STRING())
- .column("category", DataTypes.STRING())
- .column("value", DataTypes.DOUBLE())
- .partitionKeys("category")
- .option("dynamic-partition-overwrite", "false")
- .build();
-
- catalog.createTable(identifier, schema, true);
- Table table = catalog.getTable(identifier);
- FileStoreTable fileStoreTable = (FileStoreTable) table;
-
- try (StreamTableWrite write = fileStoreTable.newWrite(commitUser);
- InnerTableCommit commit =
fileStoreTable.newCommit(commitUser)) {
-
- write.write(createRow4Cols(1, "Apple", "Fruit", 1.5));
- write.write(createRow4Cols(2, "Banana", "Fruit", 0.8));
- write.write(createRow4Cols(3, "Carrot", "Vegetable", 0.6));
- write.write(createRow4Cols(4, "Broccoli", "Vegetable", 1.2));
- write.write(createRow4Cols(5, "Chicken", "Meat", 5.0));
- write.write(createRow4Cols(6, "Beef", "Meat", 8.0));
-
- commit.commit(0, write.prepareCommit(true, 0));
+ for (String format : Arrays.asList("parquet", "orc", "avro")) {
+ Identifier identifier = identifier("mixed_test_append_tablej_" +
format);
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("category", DataTypes.STRING())
+ .column("value", DataTypes.DOUBLE())
+ .column("ts", DataTypes.TIMESTAMP())
+ .column("ts_ltz",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+ .partitionKeys("category")
+ .option("dynamic-partition-overwrite", "false")
+ .option("file.format", format)
+ .build();
+
+ catalog.createTable(identifier, schema, true);
+ Table table = catalog.getTable(identifier);
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+
+ try (StreamTableWrite write = fileStoreTable.newWrite(commitUser);
+ InnerTableCommit commit =
fileStoreTable.newCommit(commitUser)) {
+
+ write.write(createRow6Cols(1, "Apple", "Fruit", 1.5, 1000000L,
2000000L));
+ write.write(createRow6Cols(2, "Banana", "Fruit", 0.8,
1000001L, 2000001L));
+ write.write(createRow6Cols(3, "Carrot", "Vegetable", 0.6,
1000002L, 2000002L));
+ write.write(createRow6Cols(4, "Broccoli", "Vegetable", 1.2,
1000003L, 2000003L));
+ write.write(createRow6Cols(5, "Chicken", "Meat", 5.0,
1000004L, 2000004L));
+ write.write(createRow6Cols(6, "Beef", "Meat", 8.0, 1000005L,
2000005L));
+
+ commit.commit(0, write.prepareCommit(true, 0));
+ }
+
+ List<Split> splits =
+ new
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
+ TableRead read = fileStoreTable.newRead();
+ List<String> res =
+ getResult(
+ read,
+ splits,
+ row -> DataFormatTestUtil.toStringNoRowKind(row,
table.rowType()));
+ assertThat(res)
+ .containsExactlyInAnyOrder(
+ "1, Apple, Fruit, 1.5, 1970-01-01T00:16:40,
1970-01-01T00:33:20",
+ "2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001,
1970-01-01T00:33:20.001",
+ "3, Carrot, Vegetable, 0.6,
1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002",
+ "4, Broccoli, Vegetable, 1.2,
1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003",
+ "5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004,
1970-01-01T00:33:20.004",
+ "6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005,
1970-01-01T00:33:20.005");
}
-
- List<Split> splits =
- new
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
- TableRead read = fileStoreTable.newRead();
- List<String> res =
- getResult(
- read,
- splits,
- row -> DataFormatTestUtil.toStringNoRowKind(row,
table.rowType()));
- assertThat(res)
- .containsExactlyInAnyOrder(
- "1, Apple, Fruit, 1.5",
- "2, Banana, Fruit, 0.8",
- "3, Carrot, Vegetable, 0.6",
- "4, Broccoli, Vegetable, 1.2",
- "5, Chicken, Meat, 5.0",
- "6, Beef, Meat, 8.0");
}
@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
public void testReadAppendTable() throws Exception {
- Identifier identifier = identifier("mixed_test_append_tablep");
- Table table = catalog.getTable(identifier);
- FileStoreTable fileStoreTable = (FileStoreTable) table;
- List<Split> splits =
- new
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
- TableRead read = fileStoreTable.newRead();
- List<String> res =
- getResult(
- read,
- splits,
- row -> DataFormatTestUtil.toStringNoRowKind(row,
table.rowType()));
- System.out.println(res);
+ for (String format : Arrays.asList("parquet", "orc", "avro")) {
+ Identifier identifier = identifier("mixed_test_append_tablep_" +
format);
+ Table table = catalog.getTable(identifier);
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ List<Split> splits =
+ new
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
+ TableRead read = fileStoreTable.newRead();
+ List<String> res =
+ getResult(
+ read,
+ splits,
+ row -> DataFormatTestUtil.toStringNoRowKind(row,
table.rowType()));
+ System.out.println(res);
+ }
}
@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
public void testJavaWriteReadPkTable() throws Exception {
- Identifier identifier = identifier("mixed_test_pk_tablej");
- Schema schema =
- Schema.newBuilder()
- .column("id", DataTypes.INT())
- .column("name", DataTypes.STRING())
- .column("category", DataTypes.STRING())
- .column("value", DataTypes.DOUBLE())
- .primaryKey("id")
- .partitionKeys("category")
- .option("dynamic-partition-overwrite", "false")
- .option("bucket", "2")
- .build();
-
- catalog.createTable(identifier, schema, true);
- Table table = catalog.getTable(identifier);
- FileStoreTable fileStoreTable = (FileStoreTable) table;
-
- try (StreamTableWrite write = fileStoreTable.newWrite(commitUser);
- InnerTableCommit commit =
fileStoreTable.newCommit(commitUser)) {
-
- write.write(createRow4Cols(1, "Apple", "Fruit", 1.5));
- write.write(createRow4Cols(2, "Banana", "Fruit", 0.8));
- write.write(createRow4Cols(3, "Carrot", "Vegetable", 0.6));
- write.write(createRow4Cols(4, "Broccoli", "Vegetable", 1.2));
- write.write(createRow4Cols(5, "Chicken", "Meat", 5.0));
- write.write(createRow4Cols(6, "Beef", "Meat", 8.0));
-
- commit.commit(0, write.prepareCommit(true, 0));
+ for (String format : Arrays.asList("parquet", "orc", "avro")) {
+ Identifier identifier = identifier("mixed_test_pk_tablej_" +
format);
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("category", DataTypes.STRING())
+ .column("value", DataTypes.DOUBLE())
+ .column("ts", DataTypes.TIMESTAMP())
+ .column("ts_ltz",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+ .primaryKey("id")
+ .partitionKeys("category")
+ .option("dynamic-partition-overwrite", "false")
+ .option("bucket", "2")
+ .option("file.format", format)
+ .build();
+
+ catalog.createTable(identifier, schema, true);
+ Table table = catalog.getTable(identifier);
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+
+ try (StreamTableWrite write = fileStoreTable.newWrite(commitUser);
+ InnerTableCommit commit =
fileStoreTable.newCommit(commitUser)) {
+
+ write.write(createRow6Cols(1, "Apple", "Fruit", 1.5, 1000000L,
2000000L));
+ write.write(createRow6Cols(2, "Banana", "Fruit", 0.8,
1000001L, 2000001L));
+ write.write(createRow6Cols(3, "Carrot", "Vegetable", 0.6,
1000002L, 2000002L));
+ write.write(createRow6Cols(4, "Broccoli", "Vegetable", 1.2,
1000003L, 2000003L));
+ write.write(createRow6Cols(5, "Chicken", "Meat", 5.0,
1000004L, 2000004L));
+ write.write(createRow6Cols(6, "Beef", "Meat", 8.0, 1000005L,
2000005L));
+
+ commit.commit(0, write.prepareCommit(true, 0));
+ }
+
+ List<Split> splits =
+ new
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
+ TableRead read = fileStoreTable.newRead();
+ List<String> res =
+ getResult(
+ read,
+ splits,
+ row -> DataFormatTestUtil.toStringNoRowKind(row,
table.rowType()));
+ assertThat(res)
+ .containsExactlyInAnyOrder(
+ "1, Apple, Fruit, 1.5, 1970-01-01T00:16:40,
1970-01-01T00:33:20",
+ "2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001,
1970-01-01T00:33:20.001",
+ "3, Carrot, Vegetable, 0.6,
1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002",
+ "4, Broccoli, Vegetable, 1.2,
1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003",
+ "5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004,
1970-01-01T00:33:20.004",
+ "6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005,
1970-01-01T00:33:20.005");
}
-
- List<Split> splits =
- new
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
- TableRead read = fileStoreTable.newRead();
- List<String> res =
- getResult(
- read,
- splits,
- row -> DataFormatTestUtil.toStringNoRowKind(row,
table.rowType()));
- assertThat(res)
- .containsExactlyInAnyOrder(
- "1, Apple, Fruit, 1.5",
- "2, Banana, Fruit, 0.8",
- "3, Carrot, Vegetable, 0.6",
- "4, Broccoli, Vegetable, 1.2",
- "5, Chicken, Meat, 5.0",
- "6, Beef, Meat, 8.0");
}
@Test
@@ -357,26 +369,31 @@ public class JavaPyE2ETest {
@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
public void testReadPkTable() throws Exception {
- Identifier identifier = identifier("mixed_test_pk_tablep_parquet");
- Table table = catalog.getTable(identifier);
- FileStoreTable fileStoreTable = (FileStoreTable) table;
- List<Split> splits =
- new
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
- TableRead read = fileStoreTable.newRead();
- List<String> res =
- getResult(
- read,
- splits,
- row -> DataFormatTestUtil.toStringNoRowKind(row,
table.rowType()));
- System.out.println("Result: " + res);
- assertThat(res)
- .containsExactlyInAnyOrder(
- "1, Apple, Fruit, 1.5",
- "2, Banana, Fruit, 0.8",
- "3, Carrot, Vegetable, 0.6",
- "4, Broccoli, Vegetable, 1.2",
- "5, Chicken, Meat, 5.0",
- "6, Beef, Meat, 8.0");
+ for (String format : Arrays.asList("parquet", "orc", "avro")) {
+ Identifier identifier = identifier("mixed_test_pk_tablep_" +
format);
+ Table table = catalog.getTable(identifier);
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ List<Split> splits =
+ new
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
+ TableRead read = fileStoreTable.newRead();
+ List<String> res =
+ getResult(
+ read,
+ splits,
+ row -> DataFormatTestUtil.toStringNoRowKind(row,
table.rowType()));
+ System.out.println("Result for " + format + " : " + res);
+
assertThat(table.rowType().getFieldTypes().get(4)).isEqualTo(DataTypes.TIMESTAMP());
+ assertThat(table.rowType().getFieldTypes().get(5))
+ .isEqualTo(DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE());
+ assertThat(res)
+ .containsExactlyInAnyOrder(
+ "1, Apple, Fruit, 1.5, 1970-01-01T00:16:40,
1970-01-01T00:33:20",
+ "2, Banana, Fruit, 0.8, 1970-01-01T00:16:40.001,
1970-01-01T00:33:20.001",
+ "3, Carrot, Vegetable, 0.6,
1970-01-01T00:16:40.002, 1970-01-01T00:33:20.002",
+ "4, Broccoli, Vegetable, 1.2,
1970-01-01T00:16:40.003, 1970-01-01T00:33:20.003",
+ "5, Chicken, Meat, 5.0, 1970-01-01T00:16:40.004,
1970-01-01T00:33:20.004",
+ "6, Beef, Meat, 8.0, 1970-01-01T00:16:40.005,
1970-01-01T00:33:20.005");
+ }
}
// Helper method from TableTestBase
@@ -407,9 +424,15 @@ public class JavaPyE2ETest {
FileIOFinder.find(tablePath), tablePath, tableSchema,
CatalogEnvironment.empty());
}
- private static InternalRow createRow4Cols(int id, String name, String
category, double value) {
+ private static InternalRow createRow6Cols(
+ int id, String name, String category, double value, long ts, long
tsLtz) {
return GenericRow.of(
- id, BinaryString.fromString(name),
BinaryString.fromString(category), value);
+ id,
+ BinaryString.fromString(name),
+ BinaryString.fromString(category),
+ value,
+ org.apache.paimon.data.Timestamp.fromEpochMillis(ts),
+ org.apache.paimon.data.Timestamp.fromEpochMillis(tsLtz));
}
protected GenericRow createRow3Cols(Object... values) {
diff --git a/paimon-python/dev/requirements-dev.txt
b/paimon-python/dev/requirements-dev.txt
index 0f61cccb68..12de5f0404 100644
--- a/paimon-python/dev/requirements-dev.txt
+++ b/paimon-python/dev/requirements-dev.txt
@@ -22,4 +22,5 @@ duckdb==1.3.2
flake8==4.0.1
pytest~=7.0
ray==2.48.0
-requests
\ No newline at end of file
+requests
+parameterized
diff --git a/paimon-python/dev/run_mixed_tests.sh
b/paimon-python/dev/run_mixed_tests.sh
index a4a404341e..cd39c6e219 100755
--- a/paimon-python/dev/run_mixed_tests.sh
+++ b/paimon-python/dev/run_mixed_tests.sh
@@ -60,18 +60,18 @@ cleanup_warehouse() {
# Function to run Java test
run_java_write_test() {
- echo -e "${YELLOW}=== Step 1: Running Java Write Tests (Parquet + Lance)
===${NC}"
+ echo -e "${YELLOW}=== Step 1: Running Java Write Tests (Parquet/Orc/Avro +
Lance) ===${NC}"
cd "$PROJECT_ROOT"
- # Run the Java test method for parquet format
- echo "Running Maven test for JavaPyE2ETest.testJavaWriteReadPkTable
(Parquet)..."
+ # Run the Java test method for Parquet/Orc/Avro format
+ echo "Running Maven test for JavaPyE2ETest.testJavaWriteReadPkTable
(Parquet/Orc/Avro)..."
echo "Note: Maven may download dependencies on first run, this may take a
while..."
local parquet_result=0
if mvn test
-Dtest=org.apache.paimon.JavaPyE2ETest#testJavaWriteReadPkTable -pl paimon-core
-Drun.e2e.tests=true; then
- echo -e "${GREEN}✓ Java write parquet test completed successfully${NC}"
+ echo -e "${GREEN}✓ Java write Parquet/Orc/Avro test completed
successfully${NC}"
else
- echo -e "${RED}✗ Java write parquet test failed${NC}"
+ echo -e "${RED}✗ Java write Parquet/Orc/Avro test failed${NC}"
parquet_result=1
fi
@@ -102,7 +102,7 @@ run_python_read_test() {
cd "$PAIMON_PYTHON_DIR"
- # Run the parameterized Python test method (runs for both parquet and
lance)
+ # Run the parameterized Python test method (runs for both Parquet/Orc/Avro
and Lance)
echo "Running Python test for JavaPyReadWriteTest.test_read_pk_table..."
if python -m pytest java_py_read_write_test.py::JavaPyReadWriteTest -k
"test_read_pk_table" -v; then
echo -e "${GREEN}✓ Python test completed successfully${NC}"
@@ -121,7 +121,7 @@ run_python_write_test() {
cd "$PAIMON_PYTHON_DIR"
- # Run the parameterized Python test method for writing data (runs for both
parquet and lance)
+ # Run the parameterized Python test method for writing data (runs for both
Parquet/Orc/Avro and Lance)
echo "Running Python test for
JavaPyReadWriteTest.test_py_write_read_pk_table (Python Write)..."
if python -m pytest java_py_read_write_test.py::JavaPyReadWriteTest -k
"test_py_write_read_pk_table" -v; then
echo -e "${GREEN}✓ Python write test completed successfully${NC}"
@@ -134,21 +134,21 @@ run_python_write_test() {
# Function to run Java Read test for Python-Write-Java-Read scenario
run_java_read_test() {
- echo -e "${YELLOW}=== Step 4: Running Java Read Test
(JavaPyE2ETest.testReadPkTable for parquet,
JavaPyLanceE2ETest.testReadPkTableLance for lance) ===${NC}"
+ echo -e "${YELLOW}=== Step 4: Running Java Read Test
(JavaPyE2ETest.testReadPkTable for Parquet/Orc/Avro,
JavaPyLanceE2ETest.testReadPkTableLance for Lance) ===${NC}"
cd "$PROJECT_ROOT"
PYTHON_VERSION=$(python -c "import sys;
print(f'{sys.version_info.major}.{sys.version_info.minor}')" 2>/dev/null ||
echo "unknown")
echo "Detected Python version: $PYTHON_VERSION"
- # Run Java test for parquet format in paimon-core
- echo "Running Maven test for JavaPyE2ETest.testReadPkTable (Java Read
Parquet)..."
+ # Run Java test for Parquet/Orc/Avro format in paimon-core
+ echo "Running Maven test for JavaPyE2ETest.testReadPkTable (Java Read
Parquet/Orc/Avro)..."
echo "Note: Maven may download dependencies on first run, this may take a
while..."
local parquet_result=0
if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testReadPkTable -pl
paimon-core -Drun.e2e.tests=true -Dpython.version="$PYTHON_VERSION"; then
- echo -e "${GREEN}✓ Java read parquet test completed successfully${NC}"
+ echo -e "${GREEN}✓ Java read Parquet/Orc/Avro test completed
successfully${NC}"
else
- echo -e "${RED}✗ Java read parquet test failed${NC}"
+ echo -e "${RED}✗ Java read Parquet/Orc/Avro test failed${NC}"
parquet_result=1
fi
@@ -293,7 +293,7 @@ main() {
echo ""
fi
- # Run Java read test (handles both parquet and lance)
+ # Run Java read test (handles both Parquet/Orc/Avro and Lance)
if ! run_java_read_test; then
java_read_result=1
fi
@@ -314,9 +314,9 @@ main() {
echo -e "${YELLOW}=== Test Results Summary ===${NC}"
if [[ $java_write_result -eq 0 ]]; then
- echo -e "${GREEN}✓ Java Write Test (Parquet + Lance): PASSED${NC}"
+ echo -e "${GREEN}✓ Java Write Test (Parquet/Orc/Avro + Lance):
PASSED${NC}"
else
- echo -e "${RED}✗ Java Write Test (Parquet + Lance): FAILED${NC}"
+ echo -e "${RED}✗ Java Write Test (Parquet/Orc/Avro + Lance):
FAILED${NC}"
fi
if [[ $python_read_result -eq 0 ]]; then
@@ -332,9 +332,9 @@ main() {
fi
if [[ $java_read_result -eq 0 ]]; then
- echo -e "${GREEN}✓ Java Read Test (Parquet + Lance): PASSED${NC}"
+ echo -e "${GREEN}✓ Java Read Test (Parquet/Orc/Avro + Lance):
PASSED${NC}"
else
- echo -e "${RED}✗ Java Read Test (Parquet + Lance): FAILED${NC}"
+ echo -e "${RED}✗ Java Read Test (Parquet/Orc/Avro + Lance):
FAILED${NC}"
fi
if [[ $pk_dv_result -eq 0 ]]; then
diff --git a/paimon-python/pypaimon/schema/data_types.py
b/paimon-python/pypaimon/schema/data_types.py
index 91404cb193..51787c7dcb 100755
--- a/paimon-python/pypaimon/schema/data_types.py
+++ b/paimon-python/pypaimon/schema/data_types.py
@@ -434,20 +434,22 @@ class PyarrowFieldParser:
precision = int(match_p.group(1))
return pyarrow.decimal128(precision, 0)
if type_name.startswith('TIMESTAMP'):
- # WITH_LOCAL_TIME_ZONE is ambiguous and not supported
- if type_name == 'TIMESTAMP':
- return pyarrow.timestamp('us', tz=None) # default to 6
- match = re.fullmatch(r'TIMESTAMP\((\d+)\)', type_name)
+ is_ltz = type_name.startswith('TIMESTAMP_LTZ') or 'WITH LOCAL
TIME ZONE' in type_name
+ tz = 'UTC' if is_ltz else None
+
+ match = re.fullmatch(r'TIMESTAMP(?:_LTZ)?\((\d+)\)(?: WITH
LOCAL TIME ZONE)?', type_name)
if match:
precision = int(match.group(1))
if precision == 0:
- return pyarrow.timestamp('s', tz=None)
+ return pyarrow.timestamp('s', tz=tz)
elif 1 <= precision <= 3:
- return pyarrow.timestamp('ms', tz=None)
+ return pyarrow.timestamp('ms', tz=tz)
elif 4 <= precision <= 6:
- return pyarrow.timestamp('us', tz=None)
+ return pyarrow.timestamp('us', tz=tz)
elif 7 <= precision <= 9:
- return pyarrow.timestamp('ns', tz=None)
+ return pyarrow.timestamp('ns', tz=tz)
+
+ return pyarrow.timestamp('us', tz=tz) # default to 6
elif type_name == 'DATE':
return pyarrow.date32()
if type_name.startswith('TIME'):
@@ -517,9 +519,12 @@ class PyarrowFieldParser:
type_name = 'BLOB'
elif types.is_decimal(pa_type):
type_name = f'DECIMAL({pa_type.precision}, {pa_type.scale})'
- elif types.is_timestamp(pa_type) and pa_type.tz is None:
+ elif types.is_timestamp(pa_type):
precision_mapping = {'s': 0, 'ms': 3, 'us': 6, 'ns': 9}
- type_name = f'TIMESTAMP({precision_mapping[pa_type.unit]})'
+ if pa_type.tz is None:
+ type_name = f'TIMESTAMP({precision_mapping[pa_type.unit]})'
+ else:
+ type_name = f'TIMESTAMP_LTZ({precision_mapping[pa_type.unit]})'
elif types.is_date32(pa_type):
type_name = 'DATE'
elif types.is_time(pa_type):
@@ -561,7 +566,8 @@ class PyarrowFieldParser:
return fields
@staticmethod
- def to_avro_type(field_type: pyarrow.DataType, field_name: str) ->
Union[str, Dict[str, Any]]:
+ def to_avro_type(field_type: pyarrow.DataType, field_name: str,
+ parent_name: str = "record") -> Union[str, Dict[str,
Any]]:
if pyarrow.types.is_integer(field_type):
if (pyarrow.types.is_signed_integer(field_type) and
field_type.bit_width <= 32) or \
(pyarrow.types.is_unsigned_integer(field_type) and
field_type.bit_width < 32):
@@ -589,31 +595,40 @@ class PyarrowFieldParser:
return {"type": "int", "logicalType": "date"}
elif pyarrow.types.is_timestamp(field_type):
unit = field_type.unit
- if unit == 'us':
- return {"type": "long", "logicalType": "timestamp-micros"}
- elif unit == 'ms':
- return {"type": "long", "logicalType": "timestamp-millis"}
+ if field_type.tz is None:
+ if unit == 'ms':
+ return {"type": "long", "logicalType": "timestamp-millis"}
+ elif unit == 'us':
+ return {"type": "long", "logicalType": "timestamp-micros"}
+ else:
+ raise ValueError(f"Avro does not support pyarrow timestamp
with unit {unit}.")
else:
- return {"type": "long", "logicalType": "timestamp-micros"}
+ if unit == 'ms':
+ return {"type": "long", "logicalType":
"local-timestamp-millis"}
+ elif unit == 'us':
+ return {"type": "long", "logicalType":
"local-timestamp-micros"}
+ else:
+ raise ValueError(f"Avro does not support pyarrow timestamp
with unit {unit}.")
elif pyarrow.types.is_list(field_type) or
pyarrow.types.is_large_list(field_type):
value_field = field_type.value_field
return {
"type": "array",
- "items": PyarrowFieldParser.to_avro_type(value_field.type,
value_field.name)
+ "items": PyarrowFieldParser.to_avro_type(value_field.type,
value_field.name, parent_name)
}
elif pyarrow.types.is_struct(field_type):
- return PyarrowFieldParser.to_avro_schema(field_type,
name="{}_record".format(field_name))
+ nested_name = "{}_{}".format(parent_name, field_name)
+ return PyarrowFieldParser.to_avro_schema(field_type,
name=nested_name)
raise ValueError("Unsupported pyarrow type for Avro conversion:
{}".format(field_type))
@staticmethod
def to_avro_schema(pyarrow_schema: Union[pyarrow.Schema,
pyarrow.StructType],
- name: str = "Root",
- namespace: str = "pyarrow.avro"
+ name: str = "record",
+ namespace: str = "org.apache.paimon.avro.generated"
) -> Dict[str, Any]:
fields = []
for field in pyarrow_schema:
- avro_type = PyarrowFieldParser.to_avro_type(field.type, field.name)
+ avro_type = PyarrowFieldParser.to_avro_type(field.type,
field.name, parent_name=name)
if field.nullable:
avro_type = ["null", avro_type]
fields.append({"name": field.name, "type": avro_type})
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index 661c5799f7..71e1fbb217 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -35,9 +35,9 @@ else:
def get_file_format_params():
if sys.version_info[:2] == (3, 6):
- return [('parquet',)]
+ return [('parquet',), ('orc',), ('avro',)]
else:
- return [('parquet',), ('lance',)]
+ return [('parquet',), ('orc',), ('avro',), ('lance',)]
class JavaPyReadWriteTest(unittest.TestCase):
@@ -50,28 +50,34 @@ class JavaPyReadWriteTest(unittest.TestCase):
})
cls.catalog.create_database('default', True)
- def test_py_write_read_append_table(self):
+ @parameterized.expand(get_file_format_params())
+ def test_py_write_read_append_table(self, file_format):
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
('category', pa.string()),
- ('value', pa.float64())
+ ('value', pa.float64()),
+ ('ts', pa.timestamp('us')),
+ ('ts_ltz', pa.timestamp('us', tz='UTC'))
])
schema = Schema.from_pyarrow_schema(
pa_schema,
partition_keys=['category'],
- options={'dynamic-partition-overwrite': 'false'}
+ options={'dynamic-partition-overwrite': 'false', 'file.format':
file_format}
)
- self.catalog.create_table('default.mixed_test_append_tablep', schema,
False)
- table = self.catalog.get_table('default.mixed_test_append_tablep')
+ table_name = f'default.mixed_test_append_tablep_{file_format}'
+ self.catalog.create_table(table_name, schema, False)
+ table = self.catalog.get_table(table_name)
initial_data = pd.DataFrame({
'id': [1, 2, 3, 4, 5, 6],
'name': ['Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken',
'Beef'],
'category': ['Fruit', 'Fruit', 'Vegetable', 'Vegetable', 'Meat',
'Meat'],
- 'value': [1.5, 0.8, 0.6, 1.2, 5.0, 8.0]
+ 'value': [1.5, 0.8, 0.6, 1.2, 5.0, 8.0],
+ 'ts': pd.to_datetime([1000000, 1000001, 1000002, 1000003, 1000004,
1000005], unit='ms'),
+ 'ts_ltz': pd.to_datetime([2000000, 2000001, 2000002, 2000003,
2000004, 2000005], unit='ms', utc=True)
})
# Write initial data
write_builder = table.new_batch_write_builder()
@@ -95,8 +101,9 @@ class JavaPyReadWriteTest(unittest.TestCase):
actual_names = set(initial_result['name'].tolist())
self.assertEqual(actual_names, expected_names)
- def test_read_append_table(self):
- table = self.catalog.get_table('default.mixed_test_append_tablej')
+ @parameterized.expand(get_file_format_params())
+ def test_read_append_table(self, file_format):
+ table = self.catalog.get_table('default.mixed_test_append_tablej_' +
file_format)
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
@@ -105,12 +112,23 @@ class JavaPyReadWriteTest(unittest.TestCase):
@parameterized.expand(get_file_format_params())
def test_py_write_read_pk_table(self, file_format):
- pa_schema = pa.schema([
- ('id', pa.int32()),
- ('name', pa.string()),
- ('category', pa.string()),
- ('value', pa.float64())
- ])
+ # Lance format doesn't support timestamp, so exclude timestamp columns
+ if file_format == 'lance':
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('category', pa.string()),
+ ('value', pa.float64())
+ ])
+ else:
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('category', pa.string()),
+ ('value', pa.float64()),
+ ('ts', pa.timestamp('us')),
+ ('ts_ltz', pa.timestamp('us', tz='UTC'))
+ ])
table_name = f'default.mixed_test_pk_tablep_{file_format}'
schema = Schema.from_pyarrow_schema(
@@ -120,7 +138,8 @@ class JavaPyReadWriteTest(unittest.TestCase):
options={
'dynamic-partition-overwrite': 'false',
'bucket': '2',
- 'file.format': file_format
+ 'file.format': file_format,
+ "orc.timestamp-ltz.legacy.type": "false"
}
)
@@ -135,12 +154,23 @@ class JavaPyReadWriteTest(unittest.TestCase):
self.catalog.create_table(table_name, schema, False)
table = self.catalog.get_table(table_name)
- initial_data = pd.DataFrame({
- 'id': [1, 2, 3, 4, 5, 6],
- 'name': ['Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken',
'Beef'],
- 'category': ['Fruit', 'Fruit', 'Vegetable', 'Vegetable', 'Meat',
'Meat'],
- 'value': [1.5, 0.8, 0.6, 1.2, 5.0, 8.0]
- })
+ # Lance format doesn't support timestamp, so exclude timestamp columns
+ if file_format == 'lance':
+ initial_data = pd.DataFrame({
+ 'id': [1, 2, 3, 4, 5, 6],
+ 'name': ['Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken',
'Beef'],
+ 'category': ['Fruit', 'Fruit', 'Vegetable', 'Vegetable',
'Meat', 'Meat'],
+ 'value': [1.5, 0.8, 0.6, 1.2, 5.0, 8.0]
+ })
+ else:
+ initial_data = pd.DataFrame({
+ 'id': [1, 2, 3, 4, 5, 6],
+ 'name': ['Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken',
'Beef'],
+ 'category': ['Fruit', 'Fruit', 'Vegetable', 'Vegetable',
'Meat', 'Meat'],
+ 'value': [1.5, 0.8, 0.6, 1.2, 5.0, 8.0],
+ 'ts': pd.to_datetime([1000000, 1000001, 1000002, 1000003,
1000004, 1000005], unit='ms'),
+ 'ts_ltz': pd.to_datetime([2000000, 2000001, 2000002, 2000003,
2000004, 2000005], unit='ms', utc=True)
+ })
write_builder = table.new_batch_write_builder()
table_write = write_builder.new_write()
table_commit = write_builder.new_commit()
@@ -163,12 +193,11 @@ class JavaPyReadWriteTest(unittest.TestCase):
@parameterized.expand(get_file_format_params())
def test_read_pk_table(self, file_format):
- # For parquet, read from Java-written table (no format suffix)
- # For lance, read from Java-written table (with format suffix)
- if file_format == 'parquet':
- table_name = 'default.mixed_test_pk_tablej'
- else:
- table_name = f'default.mixed_test_pk_tablej_{file_format}'
+ # Skip ORC format for Python < 3.8 due to pyarrow limitation with
TIMESTAMP_INSTANT
+ if sys.version_info[:2] < (3, 8) and file_format == 'orc':
+ self.skipTest("Skipping ORC format for Python < 3.8 (pyarrow does
not support TIMESTAMP_INSTANT)")
+
+ table_name = f'default.mixed_test_pk_tablej_{file_format}'
table = self.catalog.get_table(table_name)
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
@@ -178,6 +207,9 @@ class JavaPyReadWriteTest(unittest.TestCase):
# Verify data
self.assertEqual(len(res), 6)
+ if file_format != "lance":
+ self.assertEqual(table.fields[4].type.type, "TIMESTAMP(6)")
+ self.assertEqual(table.fields[5].type.type, "TIMESTAMP(6) WITH
LOCAL TIME ZONE")
# Data order may vary due to partitioning/bucketing, so compare as sets
expected_names = {'Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken',
'Beef'}
actual_names = set(res['name'].tolist())