This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 2064abe1ea [test] add e2e test case for lance read write with python
and java (#6765)
2064abe1ea is described below
commit 2064abe1ea0eabaf6b4ef9196f6e1d62f7508b51
Author: XiaoHongbo <[email protected]>
AuthorDate: Sun Dec 7 18:40:55 2025 +0800
[test] add e2e test case for lance read write with python and java (#6765)
---
.../test/java/org/apache/paimon/JavaPyE2ETest.java | 84 +++++-
paimon-lance/pom.xml | 36 +++
.../org/apache/paimon/format/lance/LanceUtils.java | 8 +-
.../java/org/apache/paimon/JavaPyLanceE2ETest.java | 335 +++++++++++++++++++++
paimon-python/dev/run_mixed_tests.sh | 97 ++++--
.../pypaimon/tests/e2e/java_py_read_write_test.py | 111 ++++++-
6 files changed, 629 insertions(+), 42 deletions(-)
diff --git a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
index 3759bc8757..df0c8c7360 100644
--- a/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/JavaPyE2ETest.java
@@ -83,8 +83,8 @@ public class JavaPyE2ETest {
@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
- public void testJavaWriteRead() throws Exception {
- Identifier identifier = identifier("mixed_test_tablej");
+ public void testJavaWriteReadAppendTable() throws Exception {
+ Identifier identifier = identifier("mixed_test_append_tablej");
Schema schema =
Schema.newBuilder()
.column("id", DataTypes.INT())
@@ -132,8 +132,8 @@ public class JavaPyE2ETest {
@Test
@EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
- public void testRead() throws Exception {
- Identifier identifier = identifier("mixed_test_tablep");
+ public void testReadAppendTable() throws Exception {
+ Identifier identifier = identifier("mixed_test_append_tablep");
Table table = catalog.getTable(identifier);
FileStoreTable fileStoreTable = (FileStoreTable) table;
List<Split> splits =
@@ -147,6 +147,82 @@ public class JavaPyE2ETest {
System.out.println(res);
}
+ @Test
+ @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+ public void testJavaWriteReadPkTable() throws Exception {
+ Identifier identifier = identifier("mixed_test_pk_tablej");
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("category", DataTypes.STRING())
+ .column("value", DataTypes.DOUBLE())
+ .primaryKey("id")
+ .partitionKeys("category")
+ .option("dynamic-partition-overwrite", "false")
+ .option("bucket", "2")
+ .build();
+
+ catalog.createTable(identifier, schema, true);
+ Table table = catalog.getTable(identifier);
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+
+ try (StreamTableWrite write = fileStoreTable.newWrite(commitUser);
+ InnerTableCommit commit =
fileStoreTable.newCommit(commitUser)) {
+
+ write.write(createRow(1, "Apple", "Fruit", 1.5));
+ write.write(createRow(2, "Banana", "Fruit", 0.8));
+ write.write(createRow(3, "Carrot", "Vegetable", 0.6));
+ write.write(createRow(4, "Broccoli", "Vegetable", 1.2));
+ write.write(createRow(5, "Chicken", "Meat", 5.0));
+ write.write(createRow(6, "Beef", "Meat", 8.0));
+
+ commit.commit(0, write.prepareCommit(true, 0));
+ }
+
+ List<Split> splits =
+ new
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
+ TableRead read = fileStoreTable.newRead();
+ List<String> res =
+ getResult(
+ read,
+ splits,
+ row -> DataFormatTestUtil.toStringNoRowKind(row,
table.rowType()));
+ assertThat(res)
+ .containsExactlyInAnyOrder(
+ "1, Apple, Fruit, 1.5",
+ "2, Banana, Fruit, 0.8",
+ "3, Carrot, Vegetable, 0.6",
+ "4, Broccoli, Vegetable, 1.2",
+ "5, Chicken, Meat, 5.0",
+ "6, Beef, Meat, 8.0");
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+ public void testReadPkTable() throws Exception {
+ Identifier identifier = identifier("mixed_test_pk_tablep_parquet");
+ Table table = catalog.getTable(identifier);
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ List<Split> splits =
+ new
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
+ TableRead read = fileStoreTable.newRead();
+ List<String> res =
+ getResult(
+ read,
+ splits,
+ row -> DataFormatTestUtil.toStringNoRowKind(row,
table.rowType()));
+ System.out.println("Result: " + res);
+ assertThat(res)
+ .containsExactlyInAnyOrder(
+ "1, Apple, Fruit, 1.5",
+ "2, Banana, Fruit, 0.8",
+ "3, Carrot, Vegetable, 0.6",
+ "4, Broccoli, Vegetable, 1.2",
+ "5, Chicken, Meat, 5.0",
+ "6, Beef, Meat, 8.0");
+ }
+
// Helper method from TableTestBase
protected Identifier identifier(String tableName) {
return new Identifier(database, tableName);
diff --git a/paimon-lance/pom.xml b/paimon-lance/pom.xml
index 9eced16548..b2d1fe76d4 100644
--- a/paimon-lance/pom.xml
+++ b/paimon-lance/pom.xml
@@ -105,6 +105,42 @@ under the License.
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-core</artifactId>
+ <version>${project.version}</version>
+ <type>test-jar</type>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.paimon</groupId>
+ <artifactId>paimon-format</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>log4j</groupId>
+ <artifactId>log4j</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
<!-- If you want to use paimon-lance, you should add following
dependencies to your environment. -->
diff --git
a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceUtils.java
b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceUtils.java
index 10b181792c..28b3c6e3ea 100644
--- a/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceUtils.java
+++ b/paimon-lance/src/main/java/org/apache/paimon/format/lance/LanceUtils.java
@@ -100,7 +100,13 @@ public class LanceUtils {
Path converted = path;
Map<String, String> storageOptions = new HashMap<>();
- if ("oss".equals(schema)) {
+
+ if ("traceable".equals(schema)) {
+ String uriString = uri.toString();
+ if (uriString.startsWith("traceable:/")) {
+ converted = new Path(uriString.replace("traceable:/",
"file:/"));
+ }
+ } else if ("oss".equals(schema)) {
assert originOptions.containsKey("fs.oss.endpoint");
assert originOptions.containsKey("fs.oss.accessKeyId");
assert originOptions.containsKey("fs.oss.accessKeySecret");
diff --git
a/paimon-lance/src/test/java/org/apache/paimon/JavaPyLanceE2ETest.java
b/paimon-lance/src/test/java/org/apache/paimon/JavaPyLanceE2ETest.java
new file mode 100644
index 0000000000..290193340a
--- /dev/null
+++ b/paimon-lance/src/test/java/org/apache/paimon/JavaPyLanceE2ETest.java
@@ -0,0 +1,335 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.DataFormatTestUtil;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.lance.jni.LanceReader;
+import org.apache.paimon.format.lance.jni.LanceWriter;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.InnerTableCommit;
+import org.apache.paimon.table.sink.StreamTableWrite;
+import org.apache.paimon.table.source.Split;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.apache.arrow.memory.RootAllocator;
+import org.apache.arrow.vector.FieldVector;
+import org.apache.arrow.vector.IntVector;
+import org.apache.arrow.vector.VarCharVector;
+import org.apache.arrow.vector.VectorSchemaRoot;
+import org.apache.arrow.vector.types.Types;
+import org.apache.arrow.vector.types.pojo.Field;
+import org.apache.arrow.vector.types.pojo.FieldType;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.condition.EnabledIfSystemProperty;
+
+import java.nio.file.Files;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.paimon.table.SimpleTableTestBase.getResult;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Mixed language e2e test for Java and Python interoperability with Lance
format. */
+public class JavaPyLanceE2ETest {
+
+ private static boolean tokioRuntimeInitialized = false;
+ private static final Object TOKIO_INIT_LOCK = new Object();
+
+ static {
+ }
+
+ @BeforeAll
+ public static void initializeTokioRuntime() {
+ synchronized (TOKIO_INIT_LOCK) {
+ if (tokioRuntimeInitialized) {
+ return;
+ }
+ try {
+ // Try to initialize Tokio runtime by creating and reading a
minimal Lance file
+ // This is a workaround to ensure Tokio runtime is available
before reading
+ // Python-written files
+ java.nio.file.Path tempInitDir =
+ Files.createTempDirectory("paimon-lance-tokio-init");
+ String testFilePath =
tempInitDir.resolve("tokio_init_test.lance").toString();
+
+ // Create a minimal Lance file using Java writer
+ RootAllocator allocator = new RootAllocator();
+ try {
+ // Create a simple schema
+ Field intField =
+ new Field(
+ "id",
FieldType.nullable(Types.MinorType.INT.getType()), null);
+ Field strField =
+ new Field(
+ "name",
+
FieldType.nullable(Types.MinorType.VARCHAR.getType()),
+ null);
+ org.apache.arrow.vector.types.pojo.Schema schema =
+ new org.apache.arrow.vector.types.pojo.Schema(
+ Arrays.asList(intField, strField));
+
+ // Create vectors
+ IntVector intVector = new IntVector("id", allocator);
+ VarCharVector strVector = new VarCharVector("name",
allocator);
+
+ intVector.allocateNew(1);
+ strVector.allocateNew(1);
+
+ intVector.set(0, 1);
+ strVector.set(0, "test".getBytes());
+
+ intVector.setValueCount(1);
+ strVector.setValueCount(1);
+
+ List<FieldVector> vectors = new ArrayList<>();
+ vectors.add(intVector);
+ vectors.add(strVector);
+
+ VectorSchemaRoot vsr = new VectorSchemaRoot(schema,
vectors, 1);
+
+ // Write using LanceWriter
+ Map<String, String> storageOptions = new HashMap<>();
+ LanceWriter writer = new LanceWriter(testFilePath,
storageOptions);
+ try {
+ writer.writeVsr(vsr);
+ } finally {
+ writer.close();
+ vsr.close();
+ intVector.close();
+ strVector.close();
+ }
+
+ // Try to read it back - this may initialize Tokio runtime
+ // Note: This might fail if Tokio is not initialized, but
we'll catch and
+ // ignore
+ try {
+ // Use field names matching the written schema ("id"
and "name")
+ org.apache.paimon.types.RowType rowType =
+ org.apache.paimon.types.RowType.of(
+ new org.apache.paimon.types.DataType[]
{
+
org.apache.paimon.types.DataTypes.INT(),
+
org.apache.paimon.types.DataTypes.STRING()
+ },
+ new String[] {"id", "name"});
+ LanceReader reader =
+ new LanceReader(testFilePath, rowType, null,
1024, storageOptions);
+ try {
+ // Try to read at least one batch
+ reader.readBatch();
+ } finally {
+ reader.close();
+ }
+ } catch (Exception e) {
+ // Ignore - this is expected if Tokio is not
initialized
+ }
+
+ // Clean up
+ Files.deleteIfExists(Paths.get(testFilePath));
+ Files.deleteIfExists(tempInitDir);
+
+ } finally {
+ allocator.close();
+ }
+
+ tokioRuntimeInitialized = true;
+ } catch (Exception e) {
+ // Don't fail the test - this is just an attempt to initialize
Tokio
+ }
+ }
+ }
+
+ java.nio.file.Path tempDir =
Paths.get("../paimon-python/pypaimon/tests/e2e").toAbsolutePath();
+
+ // Fields from TableTestBase that we need
+ protected final String commitUser = UUID.randomUUID().toString();
+ protected Path warehouse;
+ protected Catalog catalog;
+ protected String database;
+
+ @BeforeEach
+ public void before() throws Exception {
+ database = "default";
+
+ // Create warehouse directory if it doesn't exist
+ if (!Files.exists(tempDir.resolve("warehouse"))) {
+ Files.createDirectories(tempDir.resolve("warehouse"));
+ }
+
+ warehouse = new Path(TraceableFileIO.SCHEME + "://" +
tempDir.resolve("warehouse"));
+ Options options = new Options();
+ options.set("warehouse", warehouse.toUri().toString());
+ // Use preferIO to avoid FileIO loader conflicts (OSS vs Jindo)
+ CatalogContext context = CatalogContext.create(options, new
TraceableFileIO.Loader(), null);
+ catalog = CatalogFactory.createCatalog(context);
+
+ // Create database if it doesn't exist
+ try {
+ catalog.createDatabase(database, false);
+ } catch (Catalog.DatabaseAlreadyExistException e) {
+ // Database already exists, ignore
+ }
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+ public void testJavaWriteReadPkTableLance() throws Exception {
+ Identifier identifier = identifier("mixed_test_pk_tablej_lance");
+ Schema schema =
+ Schema.newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("category", DataTypes.STRING())
+ .column("value", DataTypes.DOUBLE())
+ .primaryKey("id")
+ .partitionKeys("category")
+ .option("dynamic-partition-overwrite", "false")
+ .option("bucket", "2")
+ .option("file.format", "lance")
+ .build();
+
+ catalog.createTable(identifier, schema, true);
+ Table table = catalog.getTable(identifier);
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+
+ try (StreamTableWrite write = fileStoreTable.newWrite(commitUser);
+ InnerTableCommit commit =
fileStoreTable.newCommit(commitUser)) {
+
+ write.write(createRow(1, "Apple", "Fruit", 1.5));
+ write.write(createRow(2, "Banana", "Fruit", 0.8));
+ write.write(createRow(3, "Carrot", "Vegetable", 0.6));
+ write.write(createRow(4, "Broccoli", "Vegetable", 1.2));
+ write.write(createRow(5, "Chicken", "Meat", 5.0));
+ write.write(createRow(6, "Beef", "Meat", 8.0));
+
+ commit.commit(0, write.prepareCommit(true, 0));
+ }
+
+ List<Split> splits =
+ new
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
+ TableRead read = fileStoreTable.newRead();
+ List<String> res =
+ getResult(
+ read,
+ splits,
+ row -> DataFormatTestUtil.toStringNoRowKind(row,
table.rowType()));
+ assertThat(res)
+ .containsExactlyInAnyOrder(
+ "1, Apple, Fruit, 1.5",
+ "2, Banana, Fruit, 0.8",
+ "3, Carrot, Vegetable, 0.6",
+ "4, Broccoli, Vegetable, 1.2",
+ "5, Chicken, Meat, 5.0",
+ "6, Beef, Meat, 8.0");
+ }
+
+ @Test
+ @EnabledIfSystemProperty(named = "run.e2e.tests", matches = "true")
+ public void testReadPkTableLance() throws Exception {
+ try {
+ // Known issue: Reading Python-written Lance files in Java causes
JVM crash due to
+ // missing Tokio runtime. The error is:
+ // "there is no reactor running, must be called from the context
of a Tokio 1.x runtime"
+ //
+ // This is a limitation of lance-core Java bindings. The Rust
native library requires
+ // Tokio runtime for certain operations when reading files written
by Python (which may
+ // use different encoding formats). Java-written files can be read
successfully because
+ // they use synchronous APIs that don't require Tokio.
+ //
+ // Workaround: Try to "warm up" Tokio runtime by reading a
Java-written file first.
+ // This may initialize the Tokio runtime if it's created on first
use.
+ try {
+ Identifier warmupIdentifier =
identifier("mixed_test_pk_tablej_lance");
+ try {
+ Table warmupTable = catalog.getTable(warmupIdentifier);
+ FileStoreTable warmupFileStoreTable = (FileStoreTable)
warmupTable;
+ List<Split> warmupSplits =
+ new ArrayList<>(
+
warmupFileStoreTable.newSnapshotReader().read().dataSplits());
+ if (!warmupSplits.isEmpty()) {
+ TableRead warmupRead = warmupFileStoreTable.newRead();
+ // Try to read at least one batch to initialize Tokio
runtime
+ getResult(
+ warmupRead,
+ warmupSplits.subList(0, Math.min(1,
warmupSplits.size())),
+ row ->
+ DataFormatTestUtil.toStringNoRowKind(
+ row, warmupTable.rowType()));
+ }
+ } catch (Catalog.TableNotExistException e) {
+ // Table doesn't exist, skip warm-up
+ }
+ } catch (Exception e) {
+ // Ignore warm-up errors, continue with the actual test
+ }
+
+ Identifier identifier = identifier("mixed_test_pk_tablep_lance");
+ Table table = catalog.getTable(identifier);
+ FileStoreTable fileStoreTable = (FileStoreTable) table;
+ List<Split> splits =
+ new
ArrayList<>(fileStoreTable.newSnapshotReader().read().dataSplits());
+ TableRead read = fileStoreTable.newRead();
+ List<String> res =
+ getResult(
+ read,
+ splits,
+ row -> DataFormatTestUtil.toStringNoRowKind(row,
table.rowType()));
+ System.out.println("Format: lance, Result: " + res);
+ assertThat(res)
+ .containsExactlyInAnyOrder(
+ "1, Apple, Fruit, 1.5",
+ "2, Banana, Fruit, 0.8",
+ "3, Carrot, Vegetable, 0.6",
+ "4, Broccoli, Vegetable, 1.2",
+ "5, Chicken, Meat, 5.0",
+ "6, Beef, Meat, 8.0");
+ } catch (Throwable t) {
+ throw t;
+ }
+ }
+
+ // Helper method from TableTestBase
+ protected Identifier identifier(String tableName) {
+ return new Identifier(database, tableName);
+ }
+
+ private static InternalRow createRow(int id, String name, String category,
double value) {
+ return GenericRow.of(
+ id, BinaryString.fromString(name),
BinaryString.fromString(category), value);
+ }
+}
diff --git a/paimon-python/dev/run_mixed_tests.sh
b/paimon-python/dev/run_mixed_tests.sh
index 8387e79eb9..404d7cb888 100755
--- a/paimon-python/dev/run_mixed_tests.sh
+++ b/paimon-python/dev/run_mixed_tests.sh
@@ -60,14 +60,35 @@ cleanup_warehouse() {
# Function to run Java test
run_java_write_test() {
- echo -e "${YELLOW}=== Step 1: Running Java Test
(JavaPyE2ETest.testJavaWriteRead) ===${NC}"
+ echo -e "${YELLOW}=== Step 1: Running Java Write Tests (Parquet + Lance)
===${NC}"
cd "$PROJECT_ROOT"
- # Run the specific Java test method
- echo "Running Maven test for JavaPyE2ETest.testJavaWriteRead..."
- if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testJavaWriteRead -pl
paimon-core -q -Drun.e2e.tests=true; then
- echo -e "${GREEN}✓ Java test completed successfully${NC}"
+ # Run the Java test method for parquet format
+ echo "Running Maven test for JavaPyE2ETest.testJavaWriteReadPkTable
(Parquet)..."
+ echo "Note: Maven may download dependencies on first run, this may take a
while..."
+ local parquet_result=0
+ if mvn test
-Dtest=org.apache.paimon.JavaPyE2ETest#testJavaWriteReadPkTable -pl paimon-core
-Drun.e2e.tests=true; then
+ echo -e "${GREEN}✓ Java write parquet test completed successfully${NC}"
+ else
+ echo -e "${RED}✗ Java write parquet test failed${NC}"
+ parquet_result=1
+ fi
+
+ echo ""
+
+ # Run the Java test method for lance format
+ echo "Running Maven test for
JavaPyLanceE2ETest.testJavaWriteReadPkTableLance (Lance)..."
+ echo "Note: Maven may download dependencies on first run, this may take a
while..."
+ local lance_result=0
+ if mvn test
-Dtest=org.apache.paimon.JavaPyLanceE2ETest#testJavaWriteReadPkTableLance -pl
paimon-lance -Drun.e2e.tests=true; then
+ echo -e "${GREEN}✓ Java write lance test completed successfully${NC}"
+ else
+ echo -e "${RED}✗ Java write lance test failed${NC}"
+ lance_result=1
+ fi
+
+ if [[ $parquet_result -eq 0 && $lance_result -eq 0 ]]; then
return 0
else
echo -e "${RED}✗ Java test failed${NC}"
@@ -77,13 +98,13 @@ run_java_write_test() {
# Function to run Python test
run_python_read_test() {
- echo -e "${YELLOW}=== Step 2: Running Python Test
(JavaPyReadWriteTest.testRead) ===${NC}"
+ echo -e "${YELLOW}=== Step 2: Running Python Test
(JavaPyReadWriteTest.test_read_pk_table) ===${NC}"
cd "$PAIMON_PYTHON_DIR"
- # Run the specific Python test method
- echo "Running Python test for JavaPyReadWriteTest.testRead..."
- if python -m pytest
java_py_read_write_test.py::JavaPyReadWriteTest::test_read -v; then
+ # Run the parameterized Python test method (runs for both parquet and
lance)
+ echo "Running Python test for JavaPyReadWriteTest.test_read_pk_table..."
+ if python -m pytest java_py_read_write_test.py::JavaPyReadWriteTest -k
"test_read_pk_table" -v; then
echo -e "${GREEN}✓ Python test completed successfully${NC}"
# source deactivate
return 0
@@ -96,13 +117,13 @@ run_python_read_test() {
# Function to run Python Write test for Python-Write-Java-Read scenario
run_python_write_test() {
- echo -e "${YELLOW}=== Step 3: Running Python Write Test
(JavaPyReadWriteTest.test_py_write_read) ===${NC}"
+ echo -e "${YELLOW}=== Step 3: Running Python Write Test
(JavaPyReadWriteTest.test_py_write_read_pk_table) ===${NC}"
cd "$PAIMON_PYTHON_DIR"
- # Run the specific Python test method for writing data
- echo "Running Python test for JavaPyReadWriteTest.test_py_write_read
(Python Write)..."
- if python -m pytest
java_py_read_write_test.py::JavaPyReadWriteTest::test_py_write_read -v; then
+ # Run the parameterized Python test method for writing data (runs for both
parquet and lance)
+ echo "Running Python test for
JavaPyReadWriteTest.test_py_write_read_pk_table (Python Write)..."
+ if python -m pytest java_py_read_write_test.py::JavaPyReadWriteTest -k
"test_py_write_read_pk_table" -v; then
echo -e "${GREEN}✓ Python write test completed successfully${NC}"
return 0
else
@@ -113,17 +134,37 @@ run_python_write_test() {
# Function to run Java Read test for Python-Write-Java-Read scenario
run_java_read_test() {
- echo -e "${YELLOW}=== Step 4: Running Java Read Test
(JavaPyE2ETest.testRead) ===${NC}"
+ echo -e "${YELLOW}=== Step 4: Running Java Read Test
(JavaPyE2ETest.testReadPkTable for parquet,
JavaPyLanceE2ETest.testReadPkTableLance for lance) ===${NC}"
cd "$PROJECT_ROOT"
- # Run the specific Java test method for reading Python-written data
- echo "Running Maven test for JavaPyE2ETest.testRead (Java Read)..."
- if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testRead -pl
paimon-core -q -Drun.e2e.tests=true; then
- echo -e "${GREEN}✓ Java read test completed successfully${NC}"
+ # Run Java test for parquet format in paimon-core
+ echo "Running Maven test for JavaPyE2ETest.testReadPkTable (Java Read
Parquet)..."
+ echo "Note: Maven may download dependencies on first run, this may take a
while..."
+ local parquet_result=0
+ if mvn test -Dtest=org.apache.paimon.JavaPyE2ETest#testReadPkTable -pl
paimon-core -Drun.e2e.tests=true; then
+ echo -e "${GREEN}✓ Java read parquet test completed successfully${NC}"
+ else
+ echo -e "${RED}✗ Java read parquet test failed${NC}"
+ parquet_result=1
+ fi
+
+ echo ""
+
+ # Run Java test for lance format in paimon-lance
+ echo "Running Maven test for JavaPyLanceE2ETest.testReadPkTableLance (Java
Read Lance)..."
+ echo "Note: Maven may download dependencies on first run, this may take a
while..."
+ local lance_result=0
+ if mvn test
-Dtest=org.apache.paimon.JavaPyLanceE2ETest#testReadPkTableLance -pl
paimon-lance -Drun.e2e.tests=true; then
+ echo -e "${GREEN}✓ Java read lance test completed successfully${NC}"
+ else
+ echo -e "${RED}✗ Java read lance test failed${NC}"
+ lance_result=1
+ fi
+
+ if [[ $parquet_result -eq 0 && $lance_result -eq 0 ]]; then
return 0
else
- echo -e "${RED}✗ Java read test failed${NC}"
return 1
fi
}
@@ -167,7 +208,7 @@ main() {
echo ""
fi
- # Run Java read test
+ # Run Java read test (handles both parquet and lance)
if ! run_java_read_test; then
java_read_result=1
fi
@@ -176,27 +217,27 @@ main() {
echo -e "${YELLOW}=== Test Results Summary ===${NC}"
if [[ $java_write_result -eq 0 ]]; then
- echo -e "${GREEN}✓ Java Write Test (JavaPyE2ETest.testJavaWriteRead):
PASSED${NC}"
+ echo -e "${GREEN}✓ Java Write Test (Parquet + Lance): PASSED${NC}"
else
- echo -e "${RED}✗ Java Write Test (JavaPyE2ETest.testJavaWriteRead):
FAILED${NC}"
+ echo -e "${RED}✗ Java Write Test (Parquet + Lance): FAILED${NC}"
fi
if [[ $python_read_result -eq 0 ]]; then
- echo -e "${GREEN}✓ Python Read Test (JavaPyReadWriteTest.testRead):
PASSED${NC}"
+ echo -e "${GREEN}✓ Python Read Test
(JavaPyReadWriteTest.test_read_pk_table): PASSED${NC}"
else
- echo -e "${RED}✗ Python Read Test (JavaPyReadWriteTest.testRead):
FAILED${NC}"
+ echo -e "${RED}✗ Python Read Test
(JavaPyReadWriteTest.test_read_pk_table): FAILED${NC}"
fi
if [[ $python_write_result -eq 0 ]]; then
- echo -e "${GREEN}✓ Python Write Test
(JavaPyReadWriteTest.test_py_write_read): PASSED${NC}"
+ echo -e "${GREEN}✓ Python Write Test
(JavaPyReadWriteTest.test_py_write_read_pk_table): PASSED${NC}"
else
- echo -e "${RED}✗ Python Write Test
(JavaPyReadWriteTest.test_py_write_read): FAILED${NC}"
+ echo -e "${RED}✗ Python Write Test
(JavaPyReadWriteTest.test_py_write_read_pk_table): FAILED${NC}"
fi
if [[ $java_read_result -eq 0 ]]; then
- echo -e "${GREEN}✓ Java Read Test (JavaPyE2ETest.testRead):
PASSED${NC}"
+ echo -e "${GREEN}✓ Java Read Test (Parquet + Lance): PASSED${NC}"
else
- echo -e "${RED}✗ Java Read Test (JavaPyE2ETest.testRead): FAILED${NC}"
+ echo -e "${RED}✗ Java Read Test (Parquet + Lance): FAILED${NC}"
fi
echo ""
diff --git a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
index 405484d505..3213ec87d3 100644
--- a/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
+++ b/paimon-python/pypaimon/tests/e2e/java_py_read_write_test.py
@@ -21,6 +21,7 @@ import unittest
import pandas as pd
import pyarrow as pa
+from parameterized import parameterized
from pypaimon.catalog.catalog_factory import CatalogFactory
from pypaimon.schema.schema import Schema
@@ -35,7 +36,7 @@ class JavaPyReadWriteTest(unittest.TestCase):
})
cls.catalog.create_database('default', True)
- def test_py_write_read(self):
+ def test_py_write_read_append_table(self):
pa_schema = pa.schema([
('id', pa.int32()),
('name', pa.string()),
@@ -49,8 +50,8 @@ class JavaPyReadWriteTest(unittest.TestCase):
options={'dynamic-partition-overwrite': 'false'}
)
- self.catalog.create_table('default.mixed_test_tablep', schema, False)
- table = self.catalog.get_table('default.mixed_test_tablep')
+ self.catalog.create_table('default.mixed_test_append_tablep', schema,
False)
+ table = self.catalog.get_table('default.mixed_test_append_tablep')
initial_data = pd.DataFrame({
'id': [1, 2, 3, 4, 5, 6],
@@ -75,15 +76,107 @@ class JavaPyReadWriteTest(unittest.TestCase):
initial_result = table_read.to_pandas(table_scan.plan().splits())
print(initial_result)
self.assertEqual(len(initial_result), 6)
- self.assertListEqual(
- initial_result['name'].tolist(),
- ['Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken', 'Beef']
- )
+ # Data order may vary due to partitioning/bucketing, so compare as sets
+ expected_names = {'Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken',
'Beef'}
+ actual_names = set(initial_result['name'].tolist())
+ self.assertEqual(actual_names, expected_names)
- def test_read(self):
- table = self.catalog.get_table('default.mixed_test_tablej')
+ def test_read_append_table(self):
+ table = self.catalog.get_table('default.mixed_test_append_tablej')
read_builder = table.new_read_builder()
table_scan = read_builder.new_scan()
table_read = read_builder.new_read()
res = table_read.to_pandas(table_scan.plan().splits())
print(res)
+
+ @parameterized.expand([
+ ('parquet',),
+ ('lance',),
+ ])
+ def test_py_write_read_pk_table(self, file_format):
+ pa_schema = pa.schema([
+ ('id', pa.int32()),
+ ('name', pa.string()),
+ ('category', pa.string()),
+ ('value', pa.float64())
+ ])
+
+ table_name = f'default.mixed_test_pk_tablep_{file_format}'
+ schema = Schema.from_pyarrow_schema(
+ pa_schema,
+ partition_keys=['category'],
+ primary_keys=['id'],
+ options={
+ 'dynamic-partition-overwrite': 'false',
+ 'bucket': '2',
+ 'file.format': file_format
+ }
+ )
+
+ try:
+ existing_table = self.catalog.get_table(table_name)
+ table_path = self.catalog.get_table_path(existing_table.identifier)
+ if self.catalog.file_io.exists(table_path):
+ self.catalog.file_io.delete(table_path, recursive=True)
+ except Exception:
+ pass
+
+ self.catalog.create_table(table_name, schema, False)
+ table = self.catalog.get_table(table_name)
+
+ initial_data = pd.DataFrame({
+ 'id': [1, 2, 3, 4, 5, 6],
+ 'name': ['Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken',
'Beef'],
+ 'category': ['Fruit', 'Fruit', 'Vegetable', 'Vegetable', 'Meat',
'Meat'],
+ 'value': [1.5, 0.8, 0.6, 1.2, 5.0, 8.0]
+ })
+ write_builder = table.new_batch_write_builder()
+ table_write = write_builder.new_write()
+ table_commit = write_builder.new_commit()
+
+ table_write.write_pandas(initial_data)
+ table_commit.commit(table_write.prepare_commit())
+ table_write.close()
+ table_commit.close()
+
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ initial_result = table_read.to_pandas(table_scan.plan().splits())
+ print(f"Format: {file_format}, Result:\n{initial_result}")
+ self.assertEqual(len(initial_result), 6)
+ # Data order may vary due to partitioning/bucketing, so compare as sets
+ expected_names = {'Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken',
'Beef'}
+ actual_names = set(initial_result['name'].tolist())
+ self.assertEqual(actual_names, expected_names)
+
+ @parameterized.expand([
+ ('parquet',),
+ ('lance',),
+ ])
+ def test_read_pk_table(self, file_format):
+ # For parquet, read from Java-written table (no format suffix)
+ # For lance, read from Java-written table (with format suffix)
+ if file_format == 'parquet':
+ table_name = 'default.mixed_test_pk_tablej'
+ else:
+ table_name = f'default.mixed_test_pk_tablej_{file_format}'
+ table = self.catalog.get_table(table_name)
+ read_builder = table.new_read_builder()
+ table_scan = read_builder.new_scan()
+ table_read = read_builder.new_read()
+ res = table_read.to_pandas(table_scan.plan().splits())
+ print(f"Format: {file_format}, Result:\n{res}")
+
+ # Verify data
+ self.assertEqual(len(res), 6)
+ # Data order may vary due to partitioning/bucketing, so compare as sets
+ expected_names = {'Apple', 'Banana', 'Carrot', 'Broccoli', 'Chicken',
'Beef'}
+ actual_names = set(res['name'].tolist())
+ self.assertEqual(actual_names, expected_names)
+
+ # For primary key tables, verify that _VALUE_KIND is written correctly
+ # by checking if we can read the raw data with system fields
+ # Note: Normal read filters out system fields, so we verify through
Java read
+ # which explicitly reads KeyValue objects and checks valueKind
+ print(f"Format: {file_format}, Python read completed. ValueKind
verification should be done in Java test.")