This is an automated email from the ASF dual-hosted git repository.

timbrown pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/incubator-xtable.git


The following commit(s) were added to refs/heads/main by this push:
     new 8cceec71 Paimon Source Support (#742)
8cceec71 is described below

commit 8cceec7169012ee2cde2b9f1a72dfb49dc51588c
Author: Mike Dias <[email protected]>
AuthorDate: Thu Nov 6 09:30:23 2025 +1100

    Paimon Source Support (#742)
    
    * Paimon initial support
    
    # Conflicts:
    #       
xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
    #       xtable-core/src/test/java/org/apache/xtable/GenericTable.java
    #       
xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
    
    * Expanding imports and removing java 11 target
    
    * fix compilation issue
    
    * Add Paimon Unit Tests
    
    * Addressing review comments
    
    * Fix test
    
    * Parameterizing timestamp precison tests
    
    * Fix timestamp precision metadata
    
    * Fixing tests by removing the paimon catalog config in spark
---
 .gitignore                                         |   1 +
 pom.xml                                            |  13 +
 .../apache/xtable/model/storage/TableFormat.java   |   3 +-
 xtable-core/pom.xml                                |  11 +
 .../xtable/paimon/PaimonConversionSource.java      | 138 ++++++
 .../paimon/PaimonConversionSourceProvider.java     |  52 ++
 .../xtable/paimon/PaimonDataFileExtractor.java     |  99 ++++
 .../xtable/paimon/PaimonPartitionExtractor.java    | 112 +++++
 .../xtable/paimon/PaimonSchemaExtractor.java       | 231 +++++++++
 .../test/java/org/apache/xtable/GenericTable.java  |   7 +
 .../org/apache/xtable/ITConversionController.java  |  94 ++--
 .../java/org/apache/xtable/TestPaimonTable.java    | 307 ++++++++++++
 .../xtable/paimon/TestPaimonConversionSource.java  | 269 ++++++++++
 .../xtable/paimon/TestPaimonDataFileExtractor.java | 183 +++++++
 .../paimon/TestPaimonPartitionExtractor.java       | 196 ++++++++
 .../xtable/paimon/TestPaimonSchemaExtractor.java   | 547 +++++++++++++++++++++
 xtable-service/pom.xml                             |  10 +
 .../apache/xtable/service/ITConversionService.java |  10 +-
 18 files changed, 2244 insertions(+), 39 deletions(-)

diff --git a/.gitignore b/.gitignore
index 5a59990d..3e0130df 100644
--- a/.gitignore
+++ b/.gitignore
@@ -26,6 +26,7 @@ hs_err_pid*
 # Ignore java-version and idea files.
 .java-version
 .idea
+.vscode
 
 # Ignore Gradle project-specific cache directory
 .gradle
diff --git a/pom.xml b/pom.xml
index ff80b954..f232c1ea 100644
--- a/pom.xml
+++ b/pom.xml
@@ -88,6 +88,7 @@
         <spark.version.prefix>3.4</spark.version.prefix>
         <iceberg.version>1.4.2</iceberg.version>
         <delta.version>2.4.0</delta.version>
+        <paimon.version>1.2.0</paimon.version>
         <jackson.version>2.18.2</jackson.version>
         <spotless.version>2.43.0</spotless.version>
         <apache.rat.version>0.16.1</apache.rat.version>
@@ -333,6 +334,18 @@
                 <version>${delta.hive.version}</version>
             </dependency>
 
+            <!-- Paimon -->
+            <dependency>
+                <groupId>org.apache.paimon</groupId>
+                <artifactId>paimon-bundle</artifactId>
+                <version>${paimon.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.paimon</groupId>
+                <artifactId>paimon-spark-${spark.version.prefix}</artifactId>
+                <version>${paimon.version}</version>
+            </dependency>
+
             <!-- Spark -->
             <dependency>
                 <groupId>org.apache.spark</groupId>
diff --git 
a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java 
b/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
index 9d89de6a..9ea7943a 100644
--- a/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
+++ b/xtable-api/src/main/java/org/apache/xtable/model/storage/TableFormat.java
@@ -27,9 +27,10 @@ public class TableFormat {
   public static final String HUDI = "HUDI";
   public static final String ICEBERG = "ICEBERG";
   public static final String DELTA = "DELTA";
+  public static final String PAIMON = "PAIMON";
   public static final String PARQUET = "PARQUET";
 
   public static String[] values() {
-    return new String[] {"HUDI", "ICEBERG", "DELTA"};
+    return new String[] {"HUDI", "ICEBERG", "DELTA", "PAIMON"};
   }
 }
diff --git a/xtable-core/pom.xml b/xtable-core/pom.xml
index 6bd5282c..ce9aaeaf 100644
--- a/xtable-core/pom.xml
+++ b/xtable-core/pom.xml
@@ -110,6 +110,17 @@
             <scope>test</scope>
         </dependency>
 
+        <!-- Paimon dependencies -->
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-bundle</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-spark-${spark.version.prefix}</artifactId>
+            <scope>test</scope>
+        </dependency>
+
         <!-- Hadoop dependencies -->
         <dependency>
             <groupId>org.apache.hadoop</groupId>
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java
 
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java
new file mode 100644
index 00000000..1ef6dd99
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSource.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.List;
+
+import lombok.extern.log4j.Log4j2;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.schema.SchemaManager;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.SnapshotManager;
+
+import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.model.*;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.storage.DataLayoutStrategy;
+import org.apache.xtable.model.storage.InternalDataFile;
+import org.apache.xtable.model.storage.PartitionFileGroup;
+import org.apache.xtable.model.storage.TableFormat;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+@Log4j2
+public class PaimonConversionSource implements ConversionSource<Snapshot> {
+
+  private final FileStoreTable paimonTable;
+  private final SchemaManager schemaManager;
+  private final SnapshotManager snapshotManager;
+
+  private final PaimonDataFileExtractor dataFileExtractor = 
PaimonDataFileExtractor.getInstance();
+  private final PaimonSchemaExtractor schemaExtractor = 
PaimonSchemaExtractor.getInstance();
+  private final PaimonPartitionExtractor partitionSpecExtractor =
+      PaimonPartitionExtractor.getInstance();
+
+  public PaimonConversionSource(FileStoreTable paimonTable) {
+    this.paimonTable = paimonTable;
+    this.schemaManager = paimonTable.schemaManager();
+    this.snapshotManager = paimonTable.snapshotManager();
+  }
+
+  @Override
+  public InternalTable getTable(Snapshot snapshot) {
+    TableSchema paimonSchema = schemaManager.schema(snapshot.schemaId());
+    InternalSchema internalSchema = 
schemaExtractor.toInternalSchema(paimonSchema);
+
+    List<String> partitionKeys = paimonTable.partitionKeys();
+    List<InternalPartitionField> partitioningFields =
+        partitionSpecExtractor.toInternalPartitionFields(partitionKeys, 
internalSchema);
+
+    return InternalTable.builder()
+        .name(paimonTable.name())
+        .tableFormat(TableFormat.PAIMON)
+        .readSchema(internalSchema)
+        .layoutStrategy(DataLayoutStrategy.HIVE_STYLE_PARTITION)
+        .basePath(paimonTable.location().toString())
+        .partitioningFields(partitioningFields)
+        .latestCommitTime(Instant.ofEpochMilli(snapshot.timeMillis()))
+        
.latestMetadataPath(snapshotManager.snapshotPath(snapshot.id()).toString())
+        .build();
+  }
+
+  @Override
+  public InternalTable getCurrentTable() {
+    Snapshot snapshot = getLastSnapshot();
+    return getTable(snapshot);
+  }
+
+  @Override
+  public InternalSnapshot getCurrentSnapshot() {
+    Snapshot snapshot = getLastSnapshot();
+    InternalTable internalTable = getTable(snapshot);
+    InternalSchema internalSchema = internalTable.getReadSchema();
+    List<InternalDataFile> dataFiles =
+        dataFileExtractor.toInternalDataFiles(paimonTable, snapshot, 
internalSchema);
+
+    return InternalSnapshot.builder()
+        .table(internalTable)
+        .version(Long.toString(snapshot.timeMillis()))
+        .partitionedDataFiles(PartitionFileGroup.fromFiles(dataFiles))
+        // TODO : Implement pending commits extraction, required for 
incremental sync
+        // https://github.com/apache/incubator-xtable/issues/754
+        .sourceIdentifier(getCommitIdentifier(snapshot))
+        .build();
+  }
+
+  private Snapshot getLastSnapshot() {
+    SnapshotManager snapshotManager = paimonTable.snapshotManager();
+    Snapshot snapshot = snapshotManager.latestSnapshot();
+    if (snapshot == null) {
+      throw new ReadException("No snapshots found for table " + 
paimonTable.name());
+    }
+    return snapshot;
+  }
+
+  @Override
+  public TableChange getTableChangeForCommit(Snapshot snapshot) {
+    throw new UnsupportedOperationException("Incremental Sync is not supported 
yet.");
+  }
+
+  @Override
+  public CommitsBacklog<Snapshot> getCommitsBacklog(
+      InstantsForIncrementalSync instantsForIncrementalSync) {
+    throw new UnsupportedOperationException("Incremental Sync is not supported 
yet.");
+  }
+
+  @Override
+  public boolean isIncrementalSyncSafeFrom(Instant instant) {
+    return false; // Incremental sync is not supported yet
+  }
+
+  @Override
+  public String getCommitIdentifier(Snapshot snapshot) {
+    return Long.toString(snapshot.commitIdentifier());
+  }
+
+  @Override
+  public void close() throws IOException {}
+}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSourceProvider.java
 
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSourceProvider.java
new file mode 100644
index 00000000..64f16906
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonConversionSourceProvider.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import java.io.IOException;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.FileStoreTableFactory;
+
+import org.apache.xtable.conversion.ConversionSourceProvider;
+import org.apache.xtable.conversion.SourceTable;
+import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.spi.extractor.ConversionSource;
+
+public class PaimonConversionSourceProvider extends 
ConversionSourceProvider<Snapshot> {
+  @Override
+  public ConversionSource<Snapshot> getConversionSourceInstance(SourceTable 
sourceTableConfig) {
+    try {
+      Options catalogOptions = new Options();
+      CatalogContext context = CatalogContext.create(catalogOptions, 
hadoopConf);
+
+      Path path = new Path(sourceTableConfig.getDataPath());
+      FileIO fileIO = FileIO.get(path, context);
+      FileStoreTable paimonTable = FileStoreTableFactory.create(fileIO, path);
+
+      return new PaimonConversionSource(paimonTable);
+    } catch (IOException e) {
+      throw new ReadException("Failed to read Paimon table from file system", 
e);
+    }
+  }
+}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
new file mode 100644
index 00000000..68ccfc3e
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonDataFileExtractor.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import java.util.*;
+
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.stat.ColumnStat;
+import org.apache.xtable.model.storage.InternalDataFile;
+
+public class PaimonDataFileExtractor {
+
+  private final PaimonPartitionExtractor partitionExtractor =
+      PaimonPartitionExtractor.getInstance();
+
+  private static final PaimonDataFileExtractor INSTANCE = new 
PaimonDataFileExtractor();
+
+  public static PaimonDataFileExtractor getInstance() {
+    return INSTANCE;
+  }
+
+  public List<InternalDataFile> toInternalDataFiles(
+      FileStoreTable table, Snapshot snapshot, InternalSchema internalSchema) {
+    List<InternalDataFile> result = new ArrayList<>();
+    Iterator<ManifestEntry> manifestEntryIterator =
+        newSnapshotReader(table, snapshot).readFileIterator();
+    while (manifestEntryIterator.hasNext()) {
+      result.add(toInternalDataFile(table, manifestEntryIterator.next(), 
internalSchema));
+    }
+    return result;
+  }
+
+  private InternalDataFile toInternalDataFile(
+      FileStoreTable table, ManifestEntry entry, InternalSchema 
internalSchema) {
+    return InternalDataFile.builder()
+        .physicalPath(toFullPhysicalPath(table, entry))
+        .fileSizeBytes(entry.file().fileSize())
+        .lastModified(entry.file().creationTimeEpochMillis())
+        .recordCount(entry.file().rowCount())
+        .partitionValues(
+            partitionExtractor.toPartitionValues(table, entry.partition(), 
internalSchema))
+        .columnStats(toColumnStats(entry.file()))
+        .build();
+  }
+
+  private String toFullPhysicalPath(FileStoreTable table, ManifestEntry entry) 
{
+    String basePath = table.location().toString();
+    String bucketPath = "bucket-" + entry.bucket();
+    String filePath = entry.file().fileName();
+
+    Optional<String> partitionPath = partitionExtractor.toPartitionPath(table, 
entry.partition());
+    if (partitionPath.isPresent()) {
+      return String.join("/", basePath, partitionPath.get(), bucketPath, 
filePath);
+    } else {
+      return String.join("/", basePath, bucketPath, filePath);
+    }
+  }
+
+  private List<ColumnStat> toColumnStats(DataFileMeta file) {
+    // TODO: Implement logic to extract column stats from the file meta
+    // https://github.com/apache/incubator-xtable/issues/755
+    return Collections.emptyList();
+  }
+
+  private SnapshotReader newSnapshotReader(FileStoreTable table, Snapshot 
snapshot) {
+    // If the table has primary keys, we read only the top level files
+    // which means we can only consider fully compacted files.
+    if (!table.schema().primaryKeys().isEmpty()) {
+      return table
+          .newSnapshotReader()
+          .withLevel(table.coreOptions().numLevels() - 1)
+          .withSnapshot(snapshot);
+    } else {
+      return table.newSnapshotReader().withSnapshot(snapshot);
+    }
+  }
+}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonPartitionExtractor.java
 
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonPartitionExtractor.java
new file mode 100644
index 00000000..c8e6161e
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonPartitionExtractor.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.utils.InternalRowPartitionComputer;
+
+import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+
+/** Extracts partition spec for Paimon as identity transforms on partition 
keys. */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class PaimonPartitionExtractor {
+
+  private static final PaimonPartitionExtractor INSTANCE = new 
PaimonPartitionExtractor();
+
+  public static PaimonPartitionExtractor getInstance() {
+    return INSTANCE;
+  }
+
+  public List<InternalPartitionField> toInternalPartitionFields(
+      List<String> partitionKeys, InternalSchema schema) {
+    if (partitionKeys == null || partitionKeys.isEmpty()) {
+      return Collections.emptyList();
+    }
+    return partitionKeys.stream()
+        .map(key -> toPartitionField(key, schema))
+        .collect(Collectors.toList());
+  }
+
+  public List<PartitionValue> toPartitionValues(
+      FileStoreTable table, BinaryRow partition, InternalSchema 
internalSchema) {
+    InternalRowPartitionComputer partitionComputer = 
newPartitionComputer(table);
+    Map<String, String> partValues = 
partitionComputer.generatePartValues(partition);
+
+    if (partValues.isEmpty()) {
+      return Collections.emptyList();
+    }
+
+    List<PartitionValue> partitionValues = new ArrayList<>(partValues.size());
+    for (Map.Entry<String, String> entry : partValues.entrySet()) {
+      PartitionValue partitionValue =
+          PartitionValue.builder()
+              .partitionField(toPartitionField(entry.getKey(), internalSchema))
+              .range(Range.scalar(entry.getValue()))
+              .build();
+      partitionValues.add(partitionValue);
+    }
+    return partitionValues;
+  }
+
+  public Optional<String> toPartitionPath(FileStoreTable table, BinaryRow 
partition) {
+    InternalRowPartitionComputer partitionComputer = 
newPartitionComputer(table);
+    return partitionComputer.generatePartValues(partition).entrySet().stream()
+        .map(e -> e.getKey() + "=" + e.getValue())
+        .reduce((a, b) -> a + "/" + b);
+  }
+
+  private InternalPartitionField toPartitionField(String key, InternalSchema 
schema) {
+    InternalField sourceField =
+        findField(schema, key)
+            .orElseThrow(() -> new ReadException("Partition key not found in 
schema: " + key));
+    return InternalPartitionField.builder()
+        .sourceField(sourceField)
+        .transformType(PartitionTransformType.VALUE)
+        .build();
+  }
+
+  private Optional<InternalField> findField(InternalSchema schema, String 
path) {
+    return schema.getAllFields().stream().filter(f -> 
f.getPath().equals(path)).findFirst();
+  }
+
+  private InternalRowPartitionComputer newPartitionComputer(FileStoreTable 
table) {
+    return new InternalRowPartitionComputer(
+        table.coreOptions().partitionDefaultName(),
+        table.store().partitionType(),
+        table.partitionKeys().toArray(new String[0]),
+        table.coreOptions().legacyPartitionName());
+  }
+}
diff --git 
a/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonSchemaExtractor.java 
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonSchemaExtractor.java
new file mode 100644
index 00000000..6c9d824d
--- /dev/null
+++ 
b/xtable-core/src/main/java/org/apache/xtable/paimon/PaimonSchemaExtractor.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import lombok.AccessLevel;
+import lombok.NoArgsConstructor;
+
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BinaryType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarBinaryType;
+import org.apache.paimon.types.VarCharType;
+
+import org.apache.xtable.exception.NotSupportedException;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.schema.SchemaUtils;
+
+/** Converts Paimon RowType to XTable InternalSchema. */
+@NoArgsConstructor(access = AccessLevel.PRIVATE)
+public class PaimonSchemaExtractor {
+  private static final PaimonSchemaExtractor INSTANCE = new 
PaimonSchemaExtractor();
+
+  public static PaimonSchemaExtractor getInstance() {
+    return INSTANCE;
+  }
+
+  public InternalSchema toInternalSchema(TableSchema paimonSchema) {
+    RowType rowType = paimonSchema.logicalRowType();
+    List<InternalField> fields = toInternalFields(rowType);
+    return InternalSchema.builder()
+        .name("record")
+        .dataType(InternalType.RECORD)
+        .fields(fields)
+        .recordKeyFields(primaryKeyFields(paimonSchema, fields))
+        .build();
+  }
+
+  private List<InternalField> primaryKeyFields(
+      TableSchema paimonSchema, List<InternalField> internalFields) {
+    List<String> keys = paimonSchema.primaryKeys();
+    return internalFields.stream()
+        .filter(f -> keys.contains(f.getName()))
+        .collect(Collectors.toList());
+  }
+
+  private List<InternalField> toInternalFields(RowType rowType) {
+    List<InternalField> fields = new ArrayList<>(rowType.getFieldCount());
+    for (int i = 0; i < rowType.getFieldCount(); i++) {
+      DataField dataField = rowType.getFields().get(i);
+      InternalField internalField =
+          InternalField.builder()
+              .name(dataField.name())
+              .fieldId(dataField.id())
+              .parentPath(null)
+              .schema(
+                  fromPaimonType(dataField.type(), dataField.name(), 
dataField.type().isNullable()))
+              .defaultValue(
+                  dataField.type().isNullable() ? 
InternalField.Constants.NULL_DEFAULT_VALUE : null)
+              .build();
+      fields.add(internalField);
+    }
+    return fields;
+  }
+
+  private InternalSchema fromPaimonType(DataType type, String fieldPath, 
boolean nullable) {
+    InternalType internalType;
+    List<InternalField> fields = null;
+    Map<InternalSchema.MetadataKey, Object> metadata = null;
+    if (type instanceof CharType || type instanceof VarCharType) {
+      internalType = InternalType.STRING;
+    } else if (type instanceof BooleanType) {
+      internalType = InternalType.BOOLEAN;
+    } else if (type instanceof TinyIntType
+        || type instanceof SmallIntType
+        || type instanceof IntType) {
+      internalType = InternalType.INT;
+    } else if (type instanceof BigIntType) {
+      internalType = InternalType.LONG;
+    } else if (type instanceof FloatType) {
+      internalType = InternalType.FLOAT;
+    } else if (type instanceof DoubleType) {
+      internalType = InternalType.DOUBLE;
+    } else if (type instanceof BinaryType || type instanceof VarBinaryType) {
+      internalType = InternalType.BYTES;
+    } else if (type instanceof DateType) {
+      internalType = InternalType.DATE;
+    } else if (type instanceof TimestampType || type instanceof 
LocalZonedTimestampType) {
+      internalType = InternalType.TIMESTAMP;
+      int precision;
+      if (type instanceof TimestampType) {
+        precision = ((TimestampType) type).getPrecision();
+      } else {
+        precision = ((LocalZonedTimestampType) type).getPrecision();
+      }
+
+      InternalSchema.MetadataValue precisionValue;
+      if (precision <= 3) {
+        precisionValue = InternalSchema.MetadataValue.MILLIS;
+      } else if (precision <= 6) {
+        precisionValue = InternalSchema.MetadataValue.MICROS;
+      } else {
+        precisionValue = InternalSchema.MetadataValue.NANOS;
+      }
+
+      metadata =
+          
Collections.singletonMap(InternalSchema.MetadataKey.TIMESTAMP_PRECISION, 
precisionValue);
+    } else if (type instanceof DecimalType) {
+      DecimalType d = (DecimalType) type;
+      metadata = new HashMap<>(2, 1.0f);
+      metadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 
d.getPrecision());
+      metadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, d.getScale());
+      internalType = InternalType.DECIMAL;
+    } else if (type instanceof RowType) {
+      RowType rt = (RowType) type;
+      List<InternalField> nested = new ArrayList<>(rt.getFieldCount());
+      for (DataField df : rt.getFields()) {
+        nested.add(
+            InternalField.builder()
+                .name(df.name())
+                .fieldId(df.id())
+                .parentPath(fieldPath)
+                .schema(
+                    fromPaimonType(
+                        df.type(),
+                        SchemaUtils.getFullyQualifiedPath(fieldPath, 
df.name()),
+                        df.type().isNullable()))
+                .defaultValue(
+                    df.type().isNullable() ? 
InternalField.Constants.NULL_DEFAULT_VALUE : null)
+                .build());
+      }
+      fields = nested;
+      internalType = InternalType.RECORD;
+    } else if (type instanceof ArrayType) {
+      ArrayType at = (ArrayType) type;
+      InternalSchema elementSchema =
+          fromPaimonType(
+              at.getElementType(),
+              SchemaUtils.getFullyQualifiedPath(
+                  fieldPath, InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME),
+              at.getElementType().isNullable());
+      InternalField elementField =
+          InternalField.builder()
+              .name(InternalField.Constants.ARRAY_ELEMENT_FIELD_NAME)
+              .parentPath(fieldPath)
+              .schema(elementSchema)
+              .build();
+      fields = Collections.singletonList(elementField);
+      internalType = InternalType.LIST;
+    } else if (type instanceof MapType) {
+      MapType mt = (MapType) type;
+      InternalSchema keySchema =
+          fromPaimonType(
+              mt.getKeyType(),
+              SchemaUtils.getFullyQualifiedPath(
+                  fieldPath, InternalField.Constants.MAP_KEY_FIELD_NAME),
+              false);
+      InternalField keyField =
+          InternalField.builder()
+              .name(InternalField.Constants.MAP_KEY_FIELD_NAME)
+              .parentPath(fieldPath)
+              .schema(keySchema)
+              .build();
+      InternalSchema valueSchema =
+          fromPaimonType(
+              mt.getValueType(),
+              SchemaUtils.getFullyQualifiedPath(
+                  fieldPath, InternalField.Constants.MAP_VALUE_FIELD_NAME),
+              mt.getValueType().isNullable());
+      InternalField valueField =
+          InternalField.builder()
+              .name(InternalField.Constants.MAP_VALUE_FIELD_NAME)
+              .parentPath(fieldPath)
+              .schema(valueSchema)
+              .build();
+      fields = Arrays.asList(keyField, valueField);
+      internalType = InternalType.MAP;
+    } else {
+      throw new NotSupportedException("Unsupported Paimon type: " + 
type.asSQLString());
+    }
+
+    return InternalSchema.builder()
+        .name(type.asSQLString())
+        .dataType(internalType)
+        .isNullable(nullable)
+        .metadata(metadata)
+        .fields(fields)
+        .build();
+  }
+}
diff --git a/xtable-core/src/test/java/org/apache/xtable/GenericTable.java 
b/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
index 14395e0d..a5670eac 100644
--- a/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
+++ b/xtable-core/src/test/java/org/apache/xtable/GenericTable.java
@@ -21,6 +21,7 @@ package org.apache.xtable;
 import static org.apache.xtable.model.storage.TableFormat.DELTA;
 import static org.apache.xtable.model.storage.TableFormat.HUDI;
 import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
+import static org.apache.xtable.model.storage.TableFormat.PAIMON;
 import static org.apache.xtable.model.storage.TableFormat.PARQUET;
 
 import java.nio.file.Path;
@@ -91,6 +92,9 @@ public interface GenericTable<T, Q> extends AutoCloseable {
       case ICEBERG:
         return TestIcebergTable.forStandardSchemaAndPartitioning(
             tableName, isPartitioned ? "level" : null, tempDir, 
jsc.hadoopConfiguration());
+      case PAIMON:
+        return TestPaimonTable.createTable(
+            tableName, isPartitioned ? "level" : null, tempDir, 
jsc.hadoopConfiguration(), false);
       default:
         throw new IllegalArgumentException("Unsupported source format: " + 
sourceFormat);
     }
@@ -113,6 +117,9 @@ public interface GenericTable<T, Q> extends AutoCloseable {
       case ICEBERG:
         return TestIcebergTable.forSchemaWithAdditionalColumnsAndPartitioning(
             tableName, isPartitioned ? "level" : null, tempDir, 
jsc.hadoopConfiguration());
+      case PAIMON:
+        return TestPaimonTable.createTable(
+            tableName, isPartitioned ? "level" : null, tempDir, 
jsc.hadoopConfiguration(), true);
       default:
         throw new IllegalArgumentException("Unsupported source format: " + 
sourceFormat);
     }
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java 
b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
index bda54c0f..b8ea413b 100644
--- a/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
+++ b/xtable-core/src/test/java/org/apache/xtable/ITConversionController.java
@@ -24,6 +24,7 @@ import static 
org.apache.xtable.hudi.HudiTestUtil.PartitionConfig;
 import static org.apache.xtable.model.storage.TableFormat.DELTA;
 import static org.apache.xtable.model.storage.TableFormat.HUDI;
 import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
+import static org.apache.xtable.model.storage.TableFormat.PAIMON;
 import static org.apache.xtable.model.storage.TableFormat.PARQUET;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
@@ -104,9 +105,11 @@ import 
org.apache.xtable.iceberg.IcebergConversionSourceProvider;
 import org.apache.xtable.iceberg.TestIcebergDataHelper;
 import org.apache.xtable.model.storage.TableFormat;
 import org.apache.xtable.model.sync.SyncMode;
+import org.apache.xtable.paimon.PaimonConversionSourceProvider;
 
 public class ITConversionController {
   @TempDir public static Path tempDir;
+
   private static final DateTimeFormatter DATE_FORMAT =
       DateTimeFormatter.ofPattern("yyyy-MM-dd 
HH:mm:ss.SSS").withZone(ZoneId.of("UTC"));
   private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
@@ -117,12 +120,14 @@ public class ITConversionController {
   @BeforeAll
   public static void setupOnce() {
     SparkConf sparkConf = HudiTestUtil.getSparkConf(tempDir);
+
     sparkSession =
         
SparkSession.builder().config(HoodieReadClient.addHoodieSupport(sparkConf)).getOrCreate();
     sparkSession
         .sparkContext()
         .hadoopConfiguration()
         .set("parquet.avro.write-old-list-structure", "false");
+
     jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
   }
 
@@ -142,10 +147,13 @@ public class ITConversionController {
 
   private static Stream<Arguments> 
generateTestParametersForFormatsSyncModesAndPartitioning() {
     List<Arguments> arguments = new ArrayList<>();
-    for (String sourceTableFormat : Arrays.asList(HUDI, DELTA, ICEBERG)) {
+    for (String sourceFormat : Arrays.asList(HUDI, DELTA, ICEBERG, PAIMON)) {
       for (SyncMode syncMode : SyncMode.values()) {
+        if (sourceFormat.equals(PAIMON) && syncMode == SyncMode.INCREMENTAL)
+          continue; // Paimon does not support incremental sync yet
+
         for (boolean isPartitioned : new boolean[] {true, false}) {
-          arguments.add(Arguments.of(sourceTableFormat, syncMode, 
isPartitioned));
+          arguments.add(Arguments.of(sourceFormat, syncMode, isPartitioned));
         }
       }
     }
@@ -170,23 +178,37 @@ public class ITConversionController {
   }
 
   private ConversionSourceProvider<?> getConversionSourceProvider(String 
sourceTableFormat) {
-    if (sourceTableFormat.equalsIgnoreCase(HUDI)) {
-      ConversionSourceProvider<HoodieInstant> hudiConversionSourceProvider =
-          new HudiConversionSourceProvider();
-      hudiConversionSourceProvider.init(jsc.hadoopConfiguration());
-      return hudiConversionSourceProvider;
-    } else if (sourceTableFormat.equalsIgnoreCase(DELTA)) {
-      ConversionSourceProvider<Long> deltaConversionSourceProvider =
-          new DeltaConversionSourceProvider();
-      deltaConversionSourceProvider.init(jsc.hadoopConfiguration());
-      return deltaConversionSourceProvider;
-    } else if (sourceTableFormat.equalsIgnoreCase(ICEBERG)) {
-      ConversionSourceProvider<Snapshot> icebergConversionSourceProvider =
-          new IcebergConversionSourceProvider();
-      icebergConversionSourceProvider.init(jsc.hadoopConfiguration());
-      return icebergConversionSourceProvider;
-    } else {
-      throw new IllegalArgumentException("Unsupported source format: " + 
sourceTableFormat);
+    switch (sourceTableFormat.toUpperCase()) {
+      case HUDI:
+        {
+          ConversionSourceProvider<HoodieInstant> hudiConversionSourceProvider 
=
+              new HudiConversionSourceProvider();
+          hudiConversionSourceProvider.init(jsc.hadoopConfiguration());
+          return hudiConversionSourceProvider;
+        }
+      case DELTA:
+        {
+          ConversionSourceProvider<Long> deltaConversionSourceProvider =
+              new DeltaConversionSourceProvider();
+          deltaConversionSourceProvider.init(jsc.hadoopConfiguration());
+          return deltaConversionSourceProvider;
+        }
+      case ICEBERG:
+        {
+          ConversionSourceProvider<Snapshot> icebergConversionSourceProvider =
+              new IcebergConversionSourceProvider();
+          icebergConversionSourceProvider.init(jsc.hadoopConfiguration());
+          return icebergConversionSourceProvider;
+        }
+      case PAIMON:
+        {
+          ConversionSourceProvider<org.apache.paimon.Snapshot> 
paimonConversionSourceProvider =
+              new PaimonConversionSourceProvider();
+          paimonConversionSourceProvider.init(jsc.hadoopConfiguration());
+          return paimonConversionSourceProvider;
+        }
+      default:
+        throw new IllegalArgumentException("Unsupported source format: " + 
sourceTableFormat);
     }
   }
 
@@ -486,11 +508,9 @@ public class ITConversionController {
 
   private static List<String> getOtherFormats(String sourceTableFormat) {
     return Arrays.stream(TableFormat.values())
-        .filter(
-            format ->
-                !format.equals(sourceTableFormat)
-                    && !format.equals(
-                        PARQUET)) // excluded file formats because upset, 
insert etc. not supported
+        .filter(fmt -> !fmt.equals(sourceTableFormat))
+        .filter(fmt -> !fmt.equals(PAIMON)) // Paimon target is not supported 
yet
+        .filter(fmt -> !fmt.equals(PARQUET)) // upserts/inserts are not 
supported in Parquet
         .collect(Collectors.toList());
   }
 
@@ -911,34 +931,34 @@ public class ITConversionController {
                     }));
 
     String[] selectColumnsArr = sourceTable.getColumnsToSelect().toArray(new 
String[] {});
-    List<String> dataset1Rows = 
sourceRows.selectExpr(selectColumnsArr).toJSON().collectAsList();
+    List<String> sourceRowsList = 
sourceRows.selectExpr(selectColumnsArr).toJSON().collectAsList();
     targetRowsByFormat.forEach(
-        (format, targetRows) -> {
-          List<String> dataset2Rows =
+        (targetFormat, targetRows) -> {
+          List<String> targetRowsList =
               targetRows.selectExpr(selectColumnsArr).toJSON().collectAsList();
           assertEquals(
-              dataset1Rows.size(),
-              dataset2Rows.size(),
+              sourceRowsList.size(),
+              targetRowsList.size(),
               String.format(
                   "Datasets have different row counts when reading from Spark. 
Source: %s, Target: %s",
-                  sourceFormat, format));
+                  sourceFormat, targetFormat));
           // sanity check the count to ensure test is set up properly
           if (expectedCount != null) {
-            assertEquals(expectedCount, dataset1Rows.size());
+            assertEquals(expectedCount, sourceRowsList.size());
           } else {
             // if count is not known ahead of time, ensure datasets are 
non-empty
-            assertFalse(dataset1Rows.isEmpty());
+            assertFalse(sourceRowsList.isEmpty());
           }
 
-          if (containsUUIDFields(dataset1Rows) && 
containsUUIDFields(dataset2Rows)) {
-            compareDatasetWithUUID(dataset1Rows, dataset2Rows);
+          if (containsUUIDFields(sourceRowsList) && 
containsUUIDFields(targetRowsList)) {
+            compareDatasetWithUUID(sourceRowsList, targetRowsList);
           } else {
             assertEquals(
-                dataset1Rows,
-                dataset2Rows,
+                sourceRowsList,
+                targetRowsList,
                 String.format(
                     "Datasets are not equivalent when reading from Spark. 
Source: %s, Target: %s",
-                    sourceFormat, format));
+                    sourceFormat, targetFormat));
           }
         });
   }
diff --git a/xtable-core/src/test/java/org/apache/xtable/TestPaimonTable.java 
b/xtable-core/src/test/java/org/apache/xtable/TestPaimonTable.java
new file mode 100644
index 00000000..55102007
--- /dev/null
+++ b/xtable-core/src/test/java/org/apache/xtable/TestPaimonTable.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.Timestamp;
+import org.apache.paimon.manifest.BucketEntry;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.LocalZonedTimestampType;
+import org.apache.paimon.types.RowKind;
+import org.apache.paimon.types.VarCharType;
+import org.apache.paimon.utils.ParameterUtils;
+
+public class TestPaimonTable implements GenericTable<GenericRow, String> {
+
+  private final Random random = new Random();
+  private final FileStoreTable paimonTable;
+  private final String partitionField;
+
+  public TestPaimonTable(FileStoreTable paimonTable, String partitionField) {
+    this.paimonTable = paimonTable;
+    this.partitionField = partitionField;
+  }
+
+  public static GenericTable<GenericRow, String> createTable(
+      String tableName,
+      String partitionField,
+      Path tempDir,
+      Configuration hadoopConf,
+      boolean additionalColumns) {
+    String basePath = initBasePath(tempDir, tableName);
+    Catalog catalog = createFilesystemCatalog(basePath, hadoopConf);
+    FileStoreTable paimonTable = createTable(catalog, partitionField, 
additionalColumns);
+
+    System.out.println(
+        "Initialized Paimon test table at base path: "
+            + basePath
+            + " with partition field: "
+            + partitionField
+            + " and additional columns: "
+            + additionalColumns);
+
+    return new TestPaimonTable(paimonTable, partitionField);
+  }
+
+  public static Catalog createFilesystemCatalog(String basePath, Configuration 
hadoopConf) {
+    CatalogContext context = CatalogContext.create(new 
org.apache.paimon.fs.Path(basePath));
+    return CatalogFactory.createCatalog(context);
+  }
+
+  public static FileStoreTable createTable(
+      Catalog catalog, String partitionField, boolean additionalColumns) {
+    try {
+      catalog.createDatabase("test_db", true);
+      Identifier identifier = Identifier.create("test_db", "test_table");
+      Schema schema = buildSchema(partitionField, additionalColumns);
+      catalog.createTable(identifier, schema, true);
+      return (FileStoreTable) catalog.getTable(identifier);
+    } catch (Exception e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  private static Schema buildSchema(String partitionField, boolean 
additionalColumns) {
+    Schema.Builder builder =
+        Schema.newBuilder()
+            .primaryKey("id")
+            .column("id", DataTypes.INT())
+            .column("name", DataTypes.STRING())
+            .column("value", DataTypes.DOUBLE())
+            .column("created_at", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+            .column("updated_at", DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE())
+            .column("is_active", DataTypes.BOOLEAN())
+            .column("description", DataTypes.VARCHAR(255))
+            .option("bucket", "1")
+            .option("bucket-key", "id")
+            .option("full-compaction.delta-commits", "1");
+
+    if (partitionField != null) {
+      builder
+          .primaryKey("id", partitionField)
+          .column(partitionField, DataTypes.STRING())
+          .partitionKeys(partitionField);
+    }
+
+    if (additionalColumns) {
+      builder.column("extra_info", DataTypes.STRING()).column("extra_value", 
DataTypes.DOUBLE());
+    }
+
+    return builder.build();
+  }
+
+  private GenericRow buildGenericRow(int rowIdx, TableSchema schema, String 
partitionValue) {
+    List<Object> rowValues = new ArrayList<>(schema.fields().size());
+    for (int i = 0; i < schema.fields().size(); i++) {
+      DataField field = schema.fields().get(i);
+      if (field.name().equals(partitionField)) {
+        rowValues.add(BinaryString.fromString(partitionValue));
+      } else if (field.type() instanceof IntType) {
+        rowValues.add(random.nextInt());
+      } else if (field.type() instanceof DoubleType) {
+        rowValues.add(random.nextDouble());
+      } else if (field.type() instanceof VarCharType) {
+        rowValues.add(BinaryString.fromString(field.name() + "_" + rowIdx + 
"_" + i));
+      } else if (field.type() instanceof LocalZonedTimestampType) {
+        rowValues.add(Timestamp.fromEpochMillis(System.currentTimeMillis()));
+      } else if (field.type() instanceof BooleanType) {
+        rowValues.add(random.nextBoolean());
+      } else {
+        throw new UnsupportedOperationException("Unsupported field type: " + 
field.type());
+      }
+    }
+
+    return GenericRow.of(rowValues.toArray());
+  }
+
+  private static String initBasePath(Path tempDir, String tableName) {
+    try {
+      Path basePath = tempDir.resolve(tableName);
+      Files.createDirectories(basePath);
+      return basePath.toUri().toString();
+    } catch (IOException e) {
+      throw new RuntimeException(e);
+    }
+  }
+
+  @Override
+  public List<GenericRow> insertRows(int numRows) {
+    String partitionValue = LEVEL_VALUES.get(0);
+    return insertRecordsToPartition(numRows, partitionValue);
+  }
+
+  @Override
+  public List<GenericRow> insertRecordsForSpecialPartition(int numRows) {
+    return insertRecordsToPartition(numRows, SPECIAL_PARTITION_VALUE);
+  }
+
+  private List<GenericRow> insertRecordsToPartition(int numRows, String 
partitionValue) {
+    BatchWriteBuilder batchWriteBuilder = paimonTable.newBatchWriteBuilder();
+    try (BatchTableWrite writer = batchWriteBuilder.newWrite()) {
+      List<GenericRow> rows = new ArrayList<>(numRows);
+      for (int i = 0; i < numRows; i++) {
+        GenericRow row = buildGenericRow(i, paimonTable.schema(), 
partitionValue);
+        writer.write(row);
+        rows.add(row);
+      }
+      commitWrites(batchWriteBuilder, writer);
+      compactTable();
+      return rows;
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to insert rows into Paimon table", e);
+    }
+  }
+
+  @Override
+  public void upsertRows(List<GenericRow> rows) {
+    BatchWriteBuilder batchWriteBuilder = paimonTable.newBatchWriteBuilder();
+    try (BatchTableWrite writer = batchWriteBuilder.newWrite()) {
+      for (GenericRow row : rows) {
+        writer.write(row);
+      }
+      commitWrites(batchWriteBuilder, writer);
+      compactTable();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to upsert rows into Paimon table", e);
+    }
+  }
+
+  @Override
+  public void deleteRows(List<GenericRow> rows) {
+    BatchWriteBuilder batchWriteBuilder = paimonTable.newBatchWriteBuilder();
+    try (BatchTableWrite writer = batchWriteBuilder.newWrite()) {
+      for (GenericRow row : rows) {
+        row.setRowKind(RowKind.DELETE);
+        writer.write(row);
+      }
+      commitWrites(batchWriteBuilder, writer);
+      compactTable();
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to delete rows from Paimon table", e);
+    }
+  }
+
+  private void compactTable() {
+    BatchWriteBuilder batchWriteBuilder = paimonTable.newBatchWriteBuilder();
+    SnapshotReader snapshotReader = paimonTable.newSnapshotReader();
+    try (BatchTableWrite writer = batchWriteBuilder.newWrite()) {
+      for (BucketEntry bucketEntry : snapshotReader.bucketEntries()) {
+        writer.compact(bucketEntry.partition(), bucketEntry.bucket(), true);
+      }
+      commitWrites(batchWriteBuilder, writer);
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to compact writes in Paimon table", 
e);
+    }
+  }
+
+  private static void commitWrites(BatchWriteBuilder batchWriteBuilder, 
BatchTableWrite writer)
+      throws Exception {
+    BatchTableCommit commit = batchWriteBuilder.newCommit();
+    List<CommitMessage> messages = writer.prepareCommit();
+    try {
+      commit.commit(messages);
+    } catch (Exception e) {
+      commit.abort(messages);
+      throw new RuntimeException("Failed to commit writes to Paimon table", e);
+    } finally {
+      commit.close();
+    }
+  }
+
+  @Override
+  public void deletePartition(String partitionValue) {
+    try (BatchTableCommit commit = 
paimonTable.newBatchWriteBuilder().newCommit()) {
+      commit.truncatePartitions(
+          ParameterUtils.getPartitions(partitionField + "=" + partitionValue));
+    } catch (Exception e) {
+      throw new RuntimeException("Failed to delete partition from Paimon 
table", e);
+    }
+  }
+
+  @Override
+  public void deleteSpecialPartition() {
+    deletePartition(SPECIAL_PARTITION_VALUE);
+  }
+
+  @Override
+  public String getBasePath() {
+    return paimonTable.location().toString();
+  }
+
+  @Override
+  public String getMetadataPath() {
+    return paimonTable.snapshotManager().snapshotDirectory().toString();
+  }
+
+  @Override
+  public String getOrderByColumn() {
+    return "id";
+  }
+
+  @Override
+  public void close() {}
+
+  @Override
+  public void reload() {}
+
+  @Override
+  public List<String> getColumnsToSelect() {
+    return paimonTable.schema().fieldNames().stream()
+        .filter(
+            // TODO Hudi thinks that paimon buckets are partition values, not 
sure how to handle it
+            // filtering out the partition field on the comparison for now
+            field -> !field.equals(partitionField))
+        .collect(Collectors.toList());
+  }
+
+  @Override
+  public String getFilterQuery() {
+    return "id % 2 = 0";
+  }
+
+  public FileStoreTable getPaimonTable() {
+    return paimonTable;
+  }
+}
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java
 
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java
new file mode 100644
index 00000000..4d8f8c2b
--- /dev/null
+++ 
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonConversionSource.java
@@ -0,0 +1,269 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import static org.junit.jupiter.api.Assertions.*;
+
+import java.nio.file.Path;
+import java.time.Instant;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.Snapshot;
+import org.apache.paimon.table.FileStoreTable;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.xtable.GenericTable;
+import org.apache.xtable.TestPaimonTable;
+import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.model.InstantsForIncrementalSync;
+import org.apache.xtable.model.InternalSnapshot;
+import org.apache.xtable.model.InternalTable;
+import org.apache.xtable.model.storage.DataLayoutStrategy;
+import org.apache.xtable.model.storage.PartitionFileGroup;
+import org.apache.xtable.model.storage.TableFormat;
+
+public class TestPaimonConversionSource {
+
+  @TempDir private Path tempDir;
+
+  private Configuration hadoopConf;
+  private TestPaimonTable testTable;
+  private FileStoreTable paimonTable;
+  private PaimonConversionSource conversionSource;
+
+  @BeforeEach
+  void setUp() {
+    hadoopConf = new Configuration();
+    testTable =
+        ((TestPaimonTable)
+            TestPaimonTable.createTable("test_table", "level", tempDir, 
hadoopConf, false));
+    paimonTable = testTable.getPaimonTable();
+    conversionSource = new PaimonConversionSource(paimonTable);
+  }
+
+  @Test
+  void testGetTableWithPartitionedTable() {
+    testTable.insertRows(5);
+
+    Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot();
+    assertNotNull(snapshot);
+
+    InternalTable result = conversionSource.getTable(snapshot);
+
+    assertNotNull(result);
+    assertEquals("test_table", result.getName());
+    assertEquals(TableFormat.PAIMON, result.getTableFormat());
+    assertNotNull(result.getReadSchema());
+    assertEquals(DataLayoutStrategy.HIVE_STYLE_PARTITION, 
result.getLayoutStrategy());
+    assertTrue(result.getBasePath().contains("test_table"));
+    assertEquals(1, result.getPartitioningFields().size());
+    assertEquals("level", 
result.getPartitioningFields().get(0).getSourceField().getName());
+    assertEquals(Instant.ofEpochMilli(snapshot.timeMillis()), 
result.getLatestCommitTime());
+    assertNotNull(result.getLatestMetadataPath());
+  }
+
+  @Test
+  void testGetTableWithUnpartitionedTable() {
+    GenericTable<?, String> unpartitionedTable =
+        TestPaimonTable.createTable("unpartitioned_table", null, tempDir, 
hadoopConf, false);
+    FileStoreTable unpartitionedPaimonTable =
+        ((TestPaimonTable) unpartitionedTable).getPaimonTable();
+    PaimonConversionSource unpartitionedSource =
+        new PaimonConversionSource(unpartitionedPaimonTable);
+
+    unpartitionedTable.insertRows(3);
+
+    Snapshot snapshot = 
unpartitionedPaimonTable.snapshotManager().latestSnapshot();
+    assertNotNull(snapshot);
+
+    InternalTable result = unpartitionedSource.getTable(snapshot);
+
+    assertNotNull(result);
+    assertEquals("test_table", result.getName());
+    assertEquals(TableFormat.PAIMON, result.getTableFormat());
+    assertNotNull(result.getReadSchema());
+    assertEquals(DataLayoutStrategy.HIVE_STYLE_PARTITION, 
result.getLayoutStrategy());
+    assertTrue(result.getBasePath().contains("unpartitioned_table"));
+    assertEquals(0, result.getPartitioningFields().size());
+    assertEquals(Instant.ofEpochMilli(snapshot.timeMillis()), 
result.getLatestCommitTime());
+    assertNotNull(result.getLatestMetadataPath());
+  }
+
+  @Test
+  void testGetCurrentTableSuccess() {
+    testTable.insertRows(3);
+
+    InternalTable result = conversionSource.getCurrentTable();
+
+    assertNotNull(result);
+    assertEquals(TableFormat.PAIMON, result.getTableFormat());
+    assertEquals("test_table", result.getName());
+    assertNotNull(result.getReadSchema());
+    assertEquals(DataLayoutStrategy.HIVE_STYLE_PARTITION, 
result.getLayoutStrategy());
+    assertEquals(1, result.getPartitioningFields().size());
+  }
+
+  @Test
+  void testGetCurrentTableThrowsExceptionWhenNoSnapshot() {
+    GenericTable<?, String> emptyTable =
+        TestPaimonTable.createTable("empty_table", "level", tempDir, 
hadoopConf, false);
+    FileStoreTable emptyPaimonTable = ((TestPaimonTable) 
emptyTable).getPaimonTable();
+    PaimonConversionSource emptySource = new 
PaimonConversionSource(emptyPaimonTable);
+
+    ReadException exception = assertThrows(ReadException.class, 
emptySource::getCurrentTable);
+
+    assertTrue(exception.getMessage().contains("No snapshots found for 
table"));
+  }
+
+  @Test
+  void testGetCurrentSnapshotSuccess() {
+    testTable.insertRows(5);
+
+    InternalSnapshot result = conversionSource.getCurrentSnapshot();
+
+    assertNotNull(result);
+    assertNotNull(result.getTable());
+    assertEquals(TableFormat.PAIMON, result.getTable().getTableFormat());
+    assertNotNull(result.getVersion());
+    assertNotNull(result.getSourceIdentifier());
+    assertNotNull(result.getPartitionedDataFiles());
+
+    List<PartitionFileGroup> partitionFileGroups = 
result.getPartitionedDataFiles();
+    assertFalse(partitionFileGroups.isEmpty());
+    assertTrue(partitionFileGroups.stream().allMatch(group -> 
!group.getDataFiles().isEmpty()));
+  }
+
+  @Test
+  void testGetCurrentSnapshotThrowsExceptionWhenNoSnapshot() {
+    GenericTable<?, String> emptyTable =
+        TestPaimonTable.createTable("empty_table2", "level", tempDir, 
hadoopConf, false);
+    FileStoreTable emptyPaimonTable = ((TestPaimonTable) 
emptyTable).getPaimonTable();
+    PaimonConversionSource emptySource = new 
PaimonConversionSource(emptyPaimonTable);
+
+    ReadException exception = assertThrows(ReadException.class, 
emptySource::getCurrentSnapshot);
+
+    assertTrue(exception.getMessage().contains("No snapshots found for 
table"));
+  }
+
+  @Test
+  void testGetTableChangeForCommitThrowsUnsupportedOperationException() {
+    testTable.insertRows(3);
+    Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot();
+
+    UnsupportedOperationException exception =
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> conversionSource.getTableChangeForCommit(snapshot));
+
+    assertEquals("Incremental Sync is not supported yet.", 
exception.getMessage());
+  }
+
+  @Test
+  void testGetCommitsBacklogThrowsUnsupportedOperationException() {
+    InstantsForIncrementalSync mockInstants =
+        
InstantsForIncrementalSync.builder().lastSyncInstant(Instant.now()).build();
+
+    UnsupportedOperationException exception =
+        assertThrows(
+            UnsupportedOperationException.class,
+            () -> conversionSource.getCommitsBacklog(mockInstants));
+
+    assertEquals("Incremental Sync is not supported yet.", 
exception.getMessage());
+  }
+
+  @Test
+  void testIsIncrementalSyncSafeFromReturnsFalse() {
+    Instant testInstant = Instant.now();
+
+    boolean result = conversionSource.isIncrementalSyncSafeFrom(testInstant);
+
+    assertFalse(result);
+  }
+
+  @Test
+  void testGetCommitIdentifier() {
+    testTable.insertRows(3);
+    Snapshot snapshot = paimonTable.snapshotManager().latestSnapshot();
+
+    String result = conversionSource.getCommitIdentifier(snapshot);
+
+    assertNotNull(result);
+    assertEquals(String.valueOf(snapshot.commitIdentifier()), result);
+  }
+
+  @Test
+  void testCloseDoesNotThrowException() {
+    assertDoesNotThrow(() -> conversionSource.close());
+  }
+
+  @Test
+  void testConstructorInitializesFieldsCorrectly() {
+    assertNotNull(conversionSource);
+
+    testTable.insertRows(1);
+    assertDoesNotThrow(() -> conversionSource.getCurrentTable());
+  }
+
+  @Test
+  void testMultipleSnapshots() {
+    testTable.insertRows(2);
+    Snapshot firstSnapshot = paimonTable.snapshotManager().latestSnapshot();
+    assertNotNull(firstSnapshot);
+
+    testTable.insertRows(3);
+    Snapshot secondSnapshot = paimonTable.snapshotManager().latestSnapshot();
+    assertNotNull(secondSnapshot);
+
+    assertNotEquals(firstSnapshot.id(), secondSnapshot.id());
+
+    InternalTable firstTable = conversionSource.getTable(firstSnapshot);
+    InternalTable secondTable = conversionSource.getTable(secondSnapshot);
+
+    assertNotNull(firstTable);
+    assertNotNull(secondTable);
+    assertEquals(firstTable.getName(), secondTable.getName());
+    assertEquals(firstTable.getTableFormat(), secondTable.getTableFormat());
+  }
+
+  @Test
+  void testSchemaEvolution() {
+    testTable.insertRows(2);
+
+    GenericTable<?, String> tableWithExtraColumns =
+        TestPaimonTable.createTable("table_with_extra", "level", tempDir, 
hadoopConf, true);
+    FileStoreTable extraColumnsPaimonTable =
+        ((TestPaimonTable) tableWithExtraColumns).getPaimonTable();
+    PaimonConversionSource extraColumnsSource = new 
PaimonConversionSource(extraColumnsPaimonTable);
+
+    tableWithExtraColumns.insertRows(2);
+
+    InternalTable originalTable = conversionSource.getCurrentTable();
+    InternalTable expandedTable = extraColumnsSource.getCurrentTable();
+
+    assertNotNull(originalTable);
+    assertNotNull(expandedTable);
+
+    assertTrue(
+        expandedTable.getReadSchema().getFields().size()
+            >= originalTable.getReadSchema().getFields().size());
+  }
+}
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java
new file mode 100644
index 00000000..9f906516
--- /dev/null
+++ 
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonDataFileExtractor.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.table.FileStoreTable;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.xtable.TestPaimonTable;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.model.storage.InternalDataFile;
+
+public class TestPaimonDataFileExtractor {
+  private static final PaimonDataFileExtractor extractor = 
PaimonDataFileExtractor.getInstance();
+
+  @TempDir private Path tempDir;
+  private TestPaimonTable testTable;
+  private FileStoreTable paimonTable;
+  private InternalSchema testSchema;
+
+  @Test
+  void testToInternalDataFilesWithUnpartitionedTable() {
+    createUnpartitionedTable();
+
+    // Insert some data to create files
+    testTable.insertRows(5);
+
+    List<InternalDataFile> result =
+        extractor.toInternalDataFiles(
+            paimonTable, paimonTable.snapshotManager().latestSnapshot(), 
testSchema);
+
+    assertNotNull(result);
+    assertFalse(result.isEmpty());
+
+    InternalDataFile dataFile = result.get(0);
+    assertNotNull(dataFile.getPhysicalPath());
+    assertTrue(dataFile.getPhysicalPath().contains("bucket-"));
+    assertTrue(dataFile.getFileSizeBytes() > 0);
+    assertEquals(5, dataFile.getRecordCount());
+    assertEquals(0, dataFile.getPartitionValues().size());
+  }
+
+  @Test
+  void testToInternalDataFilesWithPartitionedTable() {
+    createPartitionedTable();
+
+    // Insert some data to create files
+    testTable.insertRows(5);
+
+    List<InternalDataFile> result =
+        extractor.toInternalDataFiles(
+            paimonTable, paimonTable.snapshotManager().latestSnapshot(), 
testSchema);
+
+    assertNotNull(result);
+    assertFalse(result.isEmpty());
+
+    InternalDataFile dataFile = result.get(0);
+    assertNotNull(dataFile.getPhysicalPath());
+    assertTrue(dataFile.getPhysicalPath().contains("bucket-"));
+    assertTrue(dataFile.getFileSizeBytes() > 0);
+    assertEquals(5, dataFile.getRecordCount());
+    assertNotNull(dataFile.getPartitionValues());
+  }
+
+  @Test
+  void testToInternalDataFilesWithTableWithPrimaryKeys() {
+    createTableWithPrimaryKeys();
+
+    // Insert some data to create files
+    testTable.insertRows(5);
+
+    // Get the latest snapshot
+    List<InternalDataFile> result =
+        extractor.toInternalDataFiles(
+            paimonTable, paimonTable.snapshotManager().latestSnapshot(), 
testSchema);
+
+    assertNotNull(result);
+    assertFalse(result.isEmpty());
+
+    InternalDataFile dataFile = result.get(0);
+    assertNotNull(dataFile.getPhysicalPath());
+    assertTrue(dataFile.getFileSizeBytes() > 0);
+    assertEquals(5, dataFile.getRecordCount());
+  }
+
+  @Test
+  void testPhysicalPathFormat() {
+    createUnpartitionedTable();
+
+    // Insert data
+    testTable.insertRows(2);
+
+    List<InternalDataFile> result =
+        extractor.toInternalDataFiles(
+            paimonTable, paimonTable.snapshotManager().latestSnapshot(), 
testSchema);
+
+    assertFalse(result.isEmpty());
+
+    for (InternalDataFile dataFile : result) {
+      String path = dataFile.getPhysicalPath();
+      assertTrue(path.contains("bucket-"));
+      assertTrue(path.endsWith(".orc") || path.endsWith(".parquet"));
+    }
+  }
+
+  @Test
+  void testColumnStatsAreEmpty() {
+    createUnpartitionedTable();
+
+    testTable.insertRows(1);
+
+    List<InternalDataFile> result =
+        extractor.toInternalDataFiles(
+            paimonTable, paimonTable.snapshotManager().latestSnapshot(), 
testSchema);
+
+    assertFalse(result.isEmpty());
+    for (InternalDataFile dataFile : result) {
+      assertEquals(0, dataFile.getColumnStats().size());
+    }
+  }
+
+  private void createUnpartitionedTable() {
+    testTable =
+        (TestPaimonTable)
+            TestPaimonTable.createTable("test_table", null, tempDir, new 
Configuration(), false);
+    paimonTable = testTable.getPaimonTable();
+    testSchema =
+        InternalSchema.builder().build(); // empty schema won't matter for 
non-partitioned tables
+  }
+
+  private void createPartitionedTable() {
+    testTable =
+        (TestPaimonTable)
+            TestPaimonTable.createTable("test_table", "level", tempDir, new 
Configuration(), false);
+    paimonTable = testTable.getPaimonTable();
+
+    // just the partition field matters for this test
+    InternalField partitionField =
+        InternalField.builder()
+            .name("level")
+            
.schema(InternalSchema.builder().dataType(InternalType.STRING).build())
+            .build();
+
+    testSchema = 
InternalSchema.builder().fields(Collections.singletonList(partitionField)).build();
+  }
+
+  private void createTableWithPrimaryKeys() {
+    testTable =
+        (TestPaimonTable)
+            TestPaimonTable.createTable("test_table", null, tempDir, new 
Configuration(), false);
+    paimonTable = testTable.getPaimonTable();
+    testSchema =
+        InternalSchema.builder().build(); // empty schema won't matter for 
non-partitioned tables
+  }
+}
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonPartitionExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonPartitionExtractor.java
new file mode 100644
index 00000000..248c9f6f
--- /dev/null
+++ 
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonPartitionExtractor.java
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryRowWriter;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.table.FileStoreTable;
+import org.junit.jupiter.api.Disabled;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import org.apache.xtable.TestPaimonTable;
+import org.apache.xtable.exception.ReadException;
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalPartitionField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+import org.apache.xtable.model.schema.PartitionTransformType;
+import org.apache.xtable.model.stat.PartitionValue;
+import org.apache.xtable.model.stat.Range;
+
+public class TestPaimonPartitionExtractor {
+  private static final PaimonPartitionExtractor extractor = 
PaimonPartitionExtractor.getInstance();
+
+  @TempDir private Path tempDir;
+
+  @Test
+  void testToInternalPartitionFieldsWithEmptyKeys() {
+    InternalSchema schema = createMockSchema();
+
+    List<InternalPartitionField> result = 
extractor.toInternalPartitionFields(null, schema);
+    assertEquals(Collections.emptyList(), result);
+
+    result = extractor.toInternalPartitionFields(Collections.emptyList(), 
schema);
+    assertEquals(Collections.emptyList(), result);
+  }
+
+  @Test
+  void testToInternalPartitionFieldsWithSingleKey() {
+    InternalSchema schema = createMockSchema();
+    List<String> partitionKeys = Collections.singletonList("level");
+
+    List<InternalPartitionField> result =
+        extractor.toInternalPartitionFields(partitionKeys, schema);
+
+    assertEquals(1, result.size());
+    InternalPartitionField partitionField = result.get(0);
+    assertEquals("level", partitionField.getSourceField().getName());
+    assertEquals(PartitionTransformType.VALUE, 
partitionField.getTransformType());
+  }
+
+  @Test
+  void testToInternalPartitionFieldsWithMultipleKeys() {
+    InternalSchema schema = createMockSchema();
+    List<String> partitionKeys = Arrays.asList("level", "status");
+
+    List<InternalPartitionField> result =
+        extractor.toInternalPartitionFields(partitionKeys, schema);
+
+    assertEquals(2, result.size());
+    assertEquals("level", result.get(0).getSourceField().getName());
+    assertEquals("status", result.get(1).getSourceField().getName());
+    assertEquals(PartitionTransformType.VALUE, 
result.get(0).getTransformType());
+    assertEquals(PartitionTransformType.VALUE, 
result.get(1).getTransformType());
+  }
+
+  @Test
+  void testToInternalPartitionFieldsWithMissingKey() {
+    InternalSchema schema = createMockSchema();
+    List<String> partitionKeys = Collections.singletonList("missing_key");
+
+    ReadException exception =
+        assertThrows(
+            ReadException.class, () -> 
extractor.toInternalPartitionFields(partitionKeys, schema));
+
+    assertTrue(exception.getMessage().contains("Partition key not found in 
schema: missing_key"));
+  }
+
+  @Test
+  void testToPartitionValuesWithPartitionedTable() {
+    TestPaimonTable testTable = createPartitionedTable();
+    FileStoreTable paimonTable = testTable.getPaimonTable();
+
+    testTable.insertRows(1);
+
+    BinaryRow partition = BinaryRow.singleColumn("INFO");
+
+    InternalSchema schema = createMockSchema();
+    List<PartitionValue> result = extractor.toPartitionValues(paimonTable, 
partition, schema);
+
+    assertEquals(1, result.size());
+    PartitionValue partitionValue = result.get(0);
+    assertEquals("level", 
partitionValue.getPartitionField().getSourceField().getName());
+    assertEquals(Range.scalar("INFO"), partitionValue.getRange());
+  }
+
+  @Test
+  @Disabled("TODO: make it easier to create multi-partitioned table in tests")
+  void testToPartitionPathWithMultiplePartitionValues() {
+    // TODO this table is fixed at single partition, need to create a 
multi-partitioned table
+    TestPaimonTable testTable = createPartitionedTable();
+    FileStoreTable paimonTable = testTable.getPaimonTable();
+
+    BinaryRow partition = new BinaryRow(2);
+    BinaryRowWriter writer = new BinaryRowWriter(partition);
+    writer.writeString(0, BinaryString.fromString("INFO"));
+    writer.writeString(1, BinaryString.fromString("active"));
+    writer.complete();
+
+    Optional<String> result = extractor.toPartitionPath(paimonTable, 
partition);
+
+    assertTrue(result.isPresent());
+    assertEquals("level=INFO/level=DEBUG", result.get());
+  }
+
+  @Test
+  void testToPartitionPathWithEmptyPartitions() {
+    TestPaimonTable testTable = createUnpartitionedTable();
+    FileStoreTable paimonTable = testTable.getPaimonTable();
+
+    BinaryRow emptyPartition = BinaryRow.EMPTY_ROW;
+
+    Optional<String> result = extractor.toPartitionPath(paimonTable, 
emptyPartition);
+
+    assertFalse(result.isPresent());
+  }
+
+  private InternalSchema createMockSchema() {
+    InternalField levelField =
+        InternalField.builder()
+            .name("level")
+            .schema(
+                InternalSchema.builder()
+                    .name("STRING")
+                    .dataType(InternalType.STRING)
+                    .isNullable(true)
+                    .build())
+            .build();
+
+    InternalField statusField =
+        InternalField.builder()
+            .name("status")
+            .schema(
+                InternalSchema.builder()
+                    .name("STRING")
+                    .dataType(InternalType.STRING)
+                    .isNullable(true)
+                    .build())
+            .build();
+
+    return InternalSchema.builder()
+        .name("test_schema")
+        .dataType(InternalType.RECORD)
+        .fields(Arrays.asList(levelField, statusField))
+        .build();
+  }
+
+  private TestPaimonTable createPartitionedTable() {
+    return (TestPaimonTable)
+        TestPaimonTable.createTable("test_table", "level", tempDir, new 
Configuration(), false);
+  }
+
+  private TestPaimonTable createUnpartitionedTable() {
+    return (TestPaimonTable)
+        TestPaimonTable.createTable("test_table", null, tempDir, new 
Configuration(), false);
+  }
+}
diff --git 
a/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonSchemaExtractor.java
 
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonSchemaExtractor.java
new file mode 100644
index 00000000..77f0ece0
--- /dev/null
+++ 
b/xtable-core/src/test/java/org/apache/xtable/paimon/TestPaimonSchemaExtractor.java
@@ -0,0 +1,547 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+package org.apache.xtable.paimon;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.paimon.schema.TableSchema;
+import org.apache.paimon.types.ArrayType;
+import org.apache.paimon.types.BigIntType;
+import org.apache.paimon.types.BooleanType;
+import org.apache.paimon.types.CharType;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.DataType;
+import org.apache.paimon.types.DateType;
+import org.apache.paimon.types.DecimalType;
+import org.apache.paimon.types.DoubleType;
+import org.apache.paimon.types.FloatType;
+import org.apache.paimon.types.IntType;
+import org.apache.paimon.types.MapType;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.types.SmallIntType;
+import org.apache.paimon.types.TimestampType;
+import org.apache.paimon.types.TinyIntType;
+import org.apache.paimon.types.VarCharType;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+
+import org.apache.xtable.model.schema.InternalField;
+import org.apache.xtable.model.schema.InternalSchema;
+import org.apache.xtable.model.schema.InternalType;
+
+public class TestPaimonSchemaExtractor {
+  private static final PaimonSchemaExtractor schemaExtractor = 
PaimonSchemaExtractor.getInstance();
+
+  private void assertField(DataField paimonField, InternalField 
expectedInternalField) {
+    assertField(paimonField, expectedInternalField, Collections.emptyList());
+  }
+
+  private void assertField(
+      DataField paimonField, InternalField expectedInternalField, List<String> 
primaryKeys) {
+    TableSchema paimonSchema =
+        new TableSchema(
+            0,
+            Collections.singletonList(paimonField),
+            0,
+            Collections.emptyList(),
+            primaryKeys,
+            new HashMap<>(),
+            "");
+    InternalSchema internalSchema = 
schemaExtractor.toInternalSchema(paimonSchema);
+    List<InternalField> recordKeyFields =
+        primaryKeys.isEmpty()
+            ? Collections.emptyList()
+            : Collections.singletonList(expectedInternalField);
+    InternalSchema expectedSchema =
+        InternalSchema.builder()
+            .name("record")
+            .dataType(InternalType.RECORD)
+            .fields(Collections.singletonList(expectedInternalField))
+            .recordKeyFields(recordKeyFields)
+            .build();
+    assertEquals(expectedSchema, internalSchema);
+  }
+
+  @Test
+  void testCharField() {
+    DataField paimonField = new DataField(0, "char_field", new CharType(10));
+    InternalField expectedField =
+        InternalField.builder()
+            .name("char_field")
+            .fieldId(0)
+            .schema(
+                InternalSchema.builder()
+                    .name("CHAR(10)")
+                    .dataType(InternalType.STRING)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testVarcharField() {
+    DataField paimonField = new DataField(1, "varchar_field", new 
VarCharType(255));
+    InternalField expectedField =
+        InternalField.builder()
+            .name("varchar_field")
+            .fieldId(1)
+            .schema(
+                InternalSchema.builder()
+                    .name("VARCHAR(255)")
+                    .dataType(InternalType.STRING)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testBooleanField() {
+    DataField paimonField = new DataField(2, "boolean_field", new 
BooleanType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("boolean_field")
+            .fieldId(2)
+            .schema(
+                InternalSchema.builder()
+                    .name("BOOLEAN")
+                    .dataType(InternalType.BOOLEAN)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testTinyIntField() {
+    DataField paimonField = new DataField(3, "tinyint_field", new 
TinyIntType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("tinyint_field")
+            .fieldId(3)
+            .schema(
+                InternalSchema.builder()
+                    .name("TINYINT")
+                    .dataType(InternalType.INT)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testSmallIntField() {
+    DataField paimonField = new DataField(4, "smallint_field", new 
SmallIntType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("smallint_field")
+            .fieldId(4)
+            .schema(
+                InternalSchema.builder()
+                    .name("SMALLINT")
+                    .dataType(InternalType.INT)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testIntField() {
+    DataField paimonField = new DataField(5, "int_field", new IntType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("int_field")
+            .fieldId(5)
+            .schema(
+                InternalSchema.builder()
+                    .name("INT")
+                    .dataType(InternalType.INT)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testBigIntField() {
+    DataField paimonField = new DataField(6, "bigint_field", new BigIntType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("bigint_field")
+            .fieldId(6)
+            .schema(
+                InternalSchema.builder()
+                    .name("BIGINT")
+                    .dataType(InternalType.LONG)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testFloatField() {
+    DataField paimonField = new DataField(7, "float_field", new FloatType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("float_field")
+            .fieldId(7)
+            .schema(
+                InternalSchema.builder()
+                    .name("FLOAT")
+                    .dataType(InternalType.FLOAT)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testDoubleField() {
+    DataField paimonField = new DataField(8, "double_field", new DoubleType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("double_field")
+            .fieldId(8)
+            .schema(
+                InternalSchema.builder()
+                    .name("DOUBLE")
+                    .dataType(InternalType.DOUBLE)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testDateField() {
+    DataField paimonField = new DataField(9, "date_field", new DateType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("date_field")
+            .fieldId(9)
+            .schema(
+                InternalSchema.builder()
+                    .name("DATE")
+                    .dataType(InternalType.DATE)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @ParameterizedTest
+  @ValueSource(ints = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9})
+  void testTimestampField(int precision) {
+    DataField paimonField = new DataField(10, "timestamp_field", new 
TimestampType(precision));
+
+    InternalSchema.MetadataValue expectedPrecision;
+    if (precision <= 3) {
+      expectedPrecision = InternalSchema.MetadataValue.MILLIS;
+    } else if (precision <= 6) {
+      expectedPrecision = InternalSchema.MetadataValue.MICROS;
+    } else {
+      expectedPrecision = InternalSchema.MetadataValue.NANOS;
+    }
+
+    Map<InternalSchema.MetadataKey, Object> timestampMetadata =
+        
Collections.singletonMap(InternalSchema.MetadataKey.TIMESTAMP_PRECISION, 
expectedPrecision);
+    InternalField expectedField =
+        InternalField.builder()
+            .name("timestamp_field")
+            .fieldId(10)
+            .schema(
+                InternalSchema.builder()
+                    .name("TIMESTAMP(" + precision + ")")
+                    .dataType(InternalType.TIMESTAMP)
+                    .isNullable(true)
+                    .metadata(timestampMetadata)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @ParameterizedTest
+  @ValueSource(ints = {-1, 10})
+  void testInvalidTimestampPrecisionField(int invalidPrecision) {
+    assertThrows(IllegalArgumentException.class, () -> new 
TimestampType(invalidPrecision));
+  }
+
+  @Test
+  void testDecimalField() {
+    DataField paimonField = new DataField(11, "decimal_field", new 
DecimalType(10, 2));
+    Map<InternalSchema.MetadataKey, Object> decimalMetadata = new HashMap<>();
+    decimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_PRECISION, 10);
+    decimalMetadata.put(InternalSchema.MetadataKey.DECIMAL_SCALE, 2);
+    InternalField expectedField =
+        InternalField.builder()
+            .name("decimal_field")
+            .fieldId(11)
+            .schema(
+                InternalSchema.builder()
+                    .name("DECIMAL(10, 2)")
+                    .dataType(InternalType.DECIMAL)
+                    .isNullable(true)
+                    .metadata(decimalMetadata)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testStructField() {
+    DataField paimonField =
+        new DataField(
+            12,
+            "struct_field",
+            RowType.of(
+                new DataType[] {
+                  new IntType(),
+                  new VarCharType(255),
+                  RowType.of(new DataType[] {new DoubleType()}, new String[] 
{"very_nested_double"})
+                },
+                new String[] {"nested_int", "nested_varchar", 
"nested_struct"}));
+    InternalField nestedIntField =
+        InternalField.builder()
+            .name("nested_int")
+            .fieldId(0)
+            .parentPath("struct_field")
+            .schema(
+                InternalSchema.builder()
+                    .name("INT")
+                    .dataType(InternalType.INT)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    InternalField nestedVarcharField =
+        InternalField.builder()
+            .name("nested_varchar")
+            .fieldId(1)
+            .parentPath("struct_field")
+            .schema(
+                InternalSchema.builder()
+                    .name("VARCHAR(255)")
+                    .dataType(InternalType.STRING)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    InternalField veryNestedDoubleField =
+        InternalField.builder()
+            .name("very_nested_double")
+            .fieldId(0)
+            .parentPath("struct_field.nested_struct")
+            .schema(
+                InternalSchema.builder()
+                    .name("DOUBLE")
+                    .dataType(InternalType.DOUBLE)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    InternalField nestedStructField =
+        InternalField.builder()
+            .name("nested_struct")
+            .fieldId(2)
+            .parentPath("struct_field")
+            .schema(
+                InternalSchema.builder()
+                    .name("ROW<`very_nested_double` DOUBLE>")
+                    .dataType(InternalType.RECORD)
+                    .isNullable(true)
+                    .fields(Collections.singletonList(veryNestedDoubleField))
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    InternalField expectedField =
+        InternalField.builder()
+            .name("struct_field")
+            .fieldId(12)
+            .schema(
+                InternalSchema.builder()
+                    .name(
+                        "ROW<`nested_int` INT, `nested_varchar` VARCHAR(255), 
`nested_struct` ROW<`very_nested_double` DOUBLE>>")
+                    .dataType(InternalType.RECORD)
+                    .isNullable(true)
+                    .fields(Arrays.asList(nestedIntField, nestedVarcharField, 
nestedStructField))
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testArrayField() {
+    DataField paimonField = new DataField(13, "array_field", new ArrayType(new 
IntType()));
+    InternalField arrayElementField =
+        InternalField.builder()
+            .name("_one_field_element")
+            .parentPath("array_field")
+            .schema(
+                InternalSchema.builder()
+                    .name("INT")
+                    .dataType(InternalType.INT)
+                    .isNullable(true)
+                    .build())
+            .build();
+    InternalField expectedField =
+        InternalField.builder()
+            .name("array_field")
+            .fieldId(13)
+            .schema(
+                InternalSchema.builder()
+                    .name("ARRAY<INT>")
+                    .dataType(InternalType.LIST)
+                    .isNullable(true)
+                    .fields(Collections.singletonList(arrayElementField))
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testMapField() {
+    DataField paimonField =
+        new DataField(14, "map_field", new MapType(new VarCharType(255), new 
IntType()));
+    InternalField mapKeyField =
+        InternalField.builder()
+            .name("_one_field_key")
+            .parentPath("map_field")
+            .schema(
+                InternalSchema.builder()
+                    .name("VARCHAR(255)")
+                    .dataType(InternalType.STRING)
+                    .isNullable(false)
+                    .build())
+            .build();
+    InternalField mapValueField =
+        InternalField.builder()
+            .name("_one_field_value")
+            .parentPath("map_field")
+            .schema(
+                InternalSchema.builder()
+                    .name("INT")
+                    .dataType(InternalType.INT)
+                    .isNullable(true)
+                    .build())
+            .build();
+    InternalField expectedField =
+        InternalField.builder()
+            .name("map_field")
+            .fieldId(14)
+            .schema(
+                InternalSchema.builder()
+                    .name("MAP<VARCHAR(255), INT>")
+                    .dataType(InternalType.MAP)
+                    .isNullable(true)
+                    .fields(Arrays.asList(mapKeyField, mapValueField))
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField);
+  }
+
+  @Test
+  void testPrimaryKey() {
+    DataField paimonField = new DataField(0, "pk_field", new IntType());
+    InternalField expectedField =
+        InternalField.builder()
+            .name("pk_field")
+            .fieldId(0)
+            .schema(
+                InternalSchema.builder()
+                    .name("INT")
+                    .dataType(InternalType.INT)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    assertField(paimonField, expectedField, 
Collections.singletonList("pk_field"));
+  }
+
+  @Test
+  void testMultiplePrimaryKeys() {
+    DataField intField = new DataField(0, "int_pk", new IntType());
+    DataField stringField = new DataField(1, "string_pk", new 
VarCharType(255));
+    List<DataField> paimonFields = Arrays.asList(intField, stringField);
+    List<String> primaryKeys = Arrays.asList("int_pk", "string_pk");
+    TableSchema paimonSchema =
+        new TableSchema(
+            0, paimonFields, 0, Collections.emptyList(), primaryKeys, new 
HashMap<>(), "");
+    InternalSchema internalSchema = 
schemaExtractor.toInternalSchema(paimonSchema);
+
+    InternalField expectedIntField =
+        InternalField.builder()
+            .name("int_pk")
+            .fieldId(0)
+            .schema(
+                InternalSchema.builder()
+                    .name("INT")
+                    .dataType(InternalType.INT)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    InternalField expectedStringField =
+        InternalField.builder()
+            .name("string_pk")
+            .fieldId(1)
+            .schema(
+                InternalSchema.builder()
+                    .name("VARCHAR(255)")
+                    .dataType(InternalType.STRING)
+                    .isNullable(true)
+                    .build())
+            .defaultValue(InternalField.Constants.NULL_DEFAULT_VALUE)
+            .build();
+    List<InternalField> expectedFields = Arrays.asList(expectedIntField, 
expectedStringField);
+    InternalSchema expectedSchema =
+        InternalSchema.builder()
+            .name("record")
+            .dataType(InternalType.RECORD)
+            .fields(expectedFields)
+            .recordKeyFields(expectedFields)
+            .build();
+    assertEquals(expectedSchema, internalSchema);
+  }
+}
diff --git a/xtable-service/pom.xml b/xtable-service/pom.xml
index 381aa3d0..ee4854d2 100644
--- a/xtable-service/pom.xml
+++ b/xtable-service/pom.xml
@@ -216,6 +216,16 @@
             <scope>test</scope>
         </dependency>
 
+        <!-- Paimon dependencies -->
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-bundle</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.paimon</groupId>
+            <artifactId>paimon-spark-${spark.version.prefix}</artifactId>
+            <scope>test</scope>
+        </dependency>
 
         <dependency>
             <groupId>org.apache.parquet</groupId>
diff --git 
a/xtable-service/src/test/java/org/apache/xtable/service/ITConversionService.java
 
b/xtable-service/src/test/java/org/apache/xtable/service/ITConversionService.java
index 0e7a7e26..c87faee7 100644
--- 
a/xtable-service/src/test/java/org/apache/xtable/service/ITConversionService.java
+++ 
b/xtable-service/src/test/java/org/apache/xtable/service/ITConversionService.java
@@ -22,6 +22,7 @@ import static org.apache.xtable.GenericTable.getTableName;
 import static org.apache.xtable.model.storage.TableFormat.DELTA;
 import static org.apache.xtable.model.storage.TableFormat.HUDI;
 import static org.apache.xtable.model.storage.TableFormat.ICEBERG;
+import static org.apache.xtable.model.storage.TableFormat.PAIMON;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -91,12 +92,14 @@ public class ITConversionService {
       Files.createDirectories(basePath);
 
       SparkConf sparkConf = HudiTestUtil.getSparkConf(tempDir);
+
       sparkSession =
           
SparkSession.builder().config(HoodieReadClient.addHoodieSupport(sparkConf)).getOrCreate();
       sparkSession
           .sparkContext()
           .hadoopConfiguration()
           .set("parquet.avro.write-old-list-structure", "false");
+
       jsc = JavaSparkContext.fromSparkContext(sparkSession.sparkContext());
     } catch (IOException e) {
       throw new RuntimeException(e);
@@ -116,14 +119,18 @@ public class ITConversionService {
         new DeltaConversionSourceProvider();
     ConversionSourceProvider<org.apache.iceberg.Snapshot> 
icebergConversionSourceProvider =
         new IcebergConversionSourceProvider();
+    ConversionSourceProvider<org.apache.paimon.Snapshot> 
paimonConversionSourceProvider =
+        new org.apache.xtable.paimon.PaimonConversionSourceProvider();
 
     hudiConversionSourceProvider.init(jsc.hadoopConfiguration());
     deltaConversionSourceProvider.init(jsc.hadoopConfiguration());
     icebergConversionSourceProvider.init(jsc.hadoopConfiguration());
+    paimonConversionSourceProvider.init(jsc.hadoopConfiguration());
 
     sourceProviders.put(HUDI, hudiConversionSourceProvider);
     sourceProviders.put(DELTA, deltaConversionSourceProvider);
     sourceProviders.put(ICEBERG, icebergConversionSourceProvider);
+    sourceProviders.put(PAIMON, paimonConversionSourceProvider);
 
     this.conversionService =
         new ConversionService(
@@ -232,7 +239,7 @@ public class ITConversionService {
 
   private static Stream<Arguments> 
generateTestParametersFormatsAndPartitioning() {
     List<Arguments> arguments = new ArrayList<>();
-    for (String sourceTableFormat : Arrays.asList(HUDI, DELTA, ICEBERG)) {
+    for (String sourceTableFormat : Arrays.asList(HUDI, DELTA, ICEBERG, 
PAIMON)) {
       for (boolean isPartitioned : new boolean[] {true, false}) {
         arguments.add(Arguments.of(sourceTableFormat, isPartitioned));
       }
@@ -243,6 +250,7 @@ public class ITConversionService {
   protected static List<String> getOtherFormats(String sourceTableFormat) {
     return Arrays.stream(TableFormat.values())
         .filter(format -> !format.equals(sourceTableFormat))
+        .filter(format -> !format.equals(PAIMON)) // Paimon target not 
supported yet
         .collect(Collectors.toList());
   }
 

Reply via email to