This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new d9a7451d5c [spark] Support partitioned format table (#5502)
d9a7451d5c is described below

commit d9a7451d5caae9d41242ea88af2bfbaaf190a9f8
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Apr 21 21:05:29 2025 +0800

    [spark] Support partitioned format table (#5502)
---
 .../java/org/apache/paimon/hive/HiveCatalog.java   |   7 +-
 .../hive/Hive23CatalogFormatTableITCase.java       |   6 +
 .../apache/paimon/spark/sql/FormatTableTest.scala  |  21 +++
 .../apache/paimon/spark/sql/FormatTableTest.scala  |  21 +++
 .../apache/paimon/spark/sql/FormatTableTest.scala  |  21 +++
 .../apache/paimon/spark/sql/FormatTableTest.scala  |  21 +++
 .../apache/paimon/spark/sql/FormatTableTest.scala  |  21 +++
 .../java/org/apache/paimon/spark/SparkCatalog.java |  32 ++--
 .../spark/sql/execution/PaimonFormatTable.scala    | 196 +++++++++++++++++++++
 .../paimon/spark/SparkCatalogWithHiveTest.java     |  22 ---
 .../paimon/spark/sql/FormatTableTestBase.scala     |  58 ++++++
 11 files changed, 391 insertions(+), 35 deletions(-)

diff --git 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 164b27db56..1ef319fb31 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -1467,7 +1467,12 @@ public class HiveCatalog extends AbstractCatalog {
         sd.setSerdeInfo(serDeInfo);
 
         CoreOptions options = new CoreOptions(schema.options());
-        if (options.partitionedTableInMetastore() && 
!schema.partitionKeys().isEmpty()) {
+        boolean partitioned = !schema.partitionKeys().isEmpty();
+        // Always treat partitioned format table as partitioned table in 
metastore
+        if (provider == null && !options.partitionedTableInMetastore()) {
+            partitioned = false;
+        }
+        if (partitioned) {
             Map<String, DataField> fieldMap =
                     schema.fields().stream()
                             .collect(Collectors.toMap(DataField::name, 
Function.identity()));
diff --git 
a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogFormatTableITCase.java
 
b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogFormatTableITCase.java
index d2e277dd22..0e77a6b67e 100644
--- 
a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogFormatTableITCase.java
+++ 
b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogFormatTableITCase.java
@@ -49,4 +49,10 @@ public class Hive23CatalogFormatTableITCase extends 
HiveCatalogFormatTableITCase
     public void testPartitionTable() {
         // Need to specify partition columns because the destination table is 
partitioned.
     }
+
+    @Override
+    @Test
+    public void testFlinkCreatePartitionTable() {
+        // Need to specify partition columns because the destination table is 
partitioned.
+    }
 }
diff --git 
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
 
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
new file mode 100644
index 0000000000..ba49976ab6
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class FormatTableTest extends FormatTableTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
new file mode 100644
index 0000000000..ba49976ab6
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class FormatTableTest extends FormatTableTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
new file mode 100644
index 0000000000..ba49976ab6
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class FormatTableTest extends FormatTableTestBase {}
diff --git 
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
new file mode 100644
index 0000000000..ba49976ab6
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class FormatTableTest extends FormatTableTestBase {}
diff --git 
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
new file mode 100644
index 0000000000..ba49976ab6
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class FormatTableTest extends FormatTableTestBase {}
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index f6e29d7651..c323d86dfd 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -31,6 +31,7 @@ import org.apache.paimon.spark.catalog.SupportFunction;
 import org.apache.paimon.spark.catalog.SupportView;
 import org.apache.paimon.table.FormatTable;
 import org.apache.paimon.table.FormatTableOptions;
+import org.apache.paimon.utils.TypeUtils;
 
 import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
@@ -45,15 +46,15 @@ import 
org.apache.spark.sql.connector.expressions.FieldReference;
 import org.apache.spark.sql.connector.expressions.IdentityTransform;
 import org.apache.spark.sql.connector.expressions.NamedReference;
 import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.execution.PartitionedCSVTable;
+import org.apache.spark.sql.execution.PartitionedJsonTable;
+import org.apache.spark.sql.execution.PartitionedOrcTable;
+import org.apache.spark.sql.execution.PartitionedParquetTable;
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat;
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat;
 import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat;
 import org.apache.spark.sql.execution.datasources.v2.FileTable;
-import org.apache.spark.sql.execution.datasources.v2.csv.CSVTable;
-import org.apache.spark.sql.execution.datasources.v2.json.JsonTable;
-import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable;
-import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -464,6 +465,9 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction, S
 
     private static FileTable convertToFileTable(Identifier ident, FormatTable 
formatTable) {
         StructType schema = 
SparkTypeUtils.fromPaimonRowType(formatTable.rowType());
+        StructType partitionSchema =
+                SparkTypeUtils.fromPaimonRowType(
+                        TypeUtils.project(formatTable.rowType(), 
formatTable.partitionKeys()));
         List<String> pathList = new ArrayList<>();
         pathList.add(formatTable.location());
         Options options = Options.fromMap(formatTable.options());
@@ -471,37 +475,41 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction, S
         if (formatTable.format() == FormatTable.Format.CSV) {
             options.set("sep", 
options.get(FormatTableOptions.FIELD_DELIMITER));
             dsOptions = new CaseInsensitiveStringMap(options.toMap());
-            return new CSVTable(
+            return new PartitionedCSVTable(
                     ident.name(),
                     SparkSession.active(),
                     dsOptions,
                     
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
                     scala.Option.apply(schema),
-                    CSVFileFormat.class);
+                    CSVFileFormat.class,
+                    partitionSchema);
         } else if (formatTable.format() == FormatTable.Format.ORC) {
-            return new OrcTable(
+            return new PartitionedOrcTable(
                     ident.name(),
                     SparkSession.active(),
                     dsOptions,
                     
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
                     scala.Option.apply(schema),
-                    OrcFileFormat.class);
+                    OrcFileFormat.class,
+                    partitionSchema);
         } else if (formatTable.format() == FormatTable.Format.PARQUET) {
-            return new ParquetTable(
+            return new PartitionedParquetTable(
                     ident.name(),
                     SparkSession.active(),
                     dsOptions,
                     
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
                     scala.Option.apply(schema),
-                    ParquetFileFormat.class);
+                    ParquetFileFormat.class,
+                    partitionSchema);
         } else if (formatTable.format() == FormatTable.Format.JSON) {
-            return new JsonTable(
+            return new PartitionedJsonTable(
                     ident.name(),
                     SparkSession.active(),
                     dsOptions,
                     
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
                     scala.Option.apply(schema),
-                    JsonFileFormat.class);
+                    JsonFileFormat.class,
+                    partitionSchema);
         } else {
             throw new UnsupportedOperationException(
                     "Unsupported format table "
diff --git 
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
new file mode 100644
index 0000000000..842009b4a5
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.v2.csv.CSVTable
+import org.apache.spark.sql.execution.datasources.v2.json.JsonTable
+import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
+import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
+import org.apache.spark.sql.execution.streaming.{FileStreamSink, 
MetadataLogFileIndex}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+
+object PaimonFormatTable {
+
+  // Copy from spark and override FileIndex's partitionSchema
+  def createFileIndex(
+      options: CaseInsensitiveStringMap,
+      sparkSession: SparkSession,
+      paths: Seq[String],
+      userSpecifiedSchema: Option[StructType],
+      partitionSchema: StructType): PartitioningAwareFileIndex = {
+
+    def globPaths: Boolean = {
+      val entry = options.get(DataSource.GLOB_PATHS_KEY)
+      Option(entry).map(_ == "true").getOrElse(true)
+    }
+
+    val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+    // Hadoop Configurations are case-sensitive.
+    val hadoopConf = 
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
+    if (FileStreamSink.hasMetadata(paths, hadoopConf, 
sparkSession.sessionState.conf)) {
+      // We are reading from the results of a streaming query. We will load 
files from
+      // the metadata log instead of listing them using HDFS APIs.
+      new PartitionedMetadataLogFileIndex(
+        sparkSession,
+        new Path(paths.head),
+        options.asScala.toMap,
+        userSpecifiedSchema,
+        partitionSchema = partitionSchema)
+    } else {
+      // This is a non-streaming file based datasource.
+      val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(
+        paths,
+        hadoopConf,
+        checkEmptyGlobPath = true,
+        checkFilesExist = true,
+        enableGlobbing = globPaths)
+      val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
+
+      new PartitionedInMemoryFileIndex(
+        sparkSession,
+        rootPathsSpecified,
+        caseSensitiveMap,
+        userSpecifiedSchema,
+        fileStatusCache,
+        partitionSchema = partitionSchema)
+    }
+  }
+
+  // Extend from MetadataLogFileIndex to override partitionSchema
+  private class PartitionedMetadataLogFileIndex(
+      sparkSession: SparkSession,
+      path: Path,
+      parameters: Map[String, String],
+      userSpecifiedSchema: Option[StructType],
+      override val partitionSchema: StructType)
+    extends MetadataLogFileIndex(sparkSession, path, parameters, 
userSpecifiedSchema)
+
+  // Extend from InMemoryFileIndex to override partitionSchema
+  private class PartitionedInMemoryFileIndex(
+      sparkSession: SparkSession,
+      rootPathsSpecified: Seq[Path],
+      parameters: Map[String, String],
+      userSpecifiedSchema: Option[StructType],
+      fileStatusCache: FileStatusCache = NoopCache,
+      userSpecifiedPartitionSpec: Option[PartitionSpec] = None,
+      metadataOpsTimeNs: Option[Long] = None,
+      override val partitionSchema: StructType)
+    extends InMemoryFileIndex(
+      sparkSession,
+      rootPathsSpecified,
+      parameters,
+      userSpecifiedSchema,
+      fileStatusCache,
+      userSpecifiedPartitionSpec,
+      metadataOpsTimeNs)
+}
+
+// Paimon Format Table
+
+class PartitionedCSVTable(
+    name: String,
+    sparkSession: SparkSession,
+    options: CaseInsensitiveStringMap,
+    paths: Seq[String],
+    userSpecifiedSchema: Option[StructType],
+    fallbackFileFormat: Class[_ <: FileFormat],
+    partitionSchema: StructType
+) extends CSVTable(name, sparkSession, options, paths, userSpecifiedSchema, 
fallbackFileFormat) {
+
+  override lazy val fileIndex: PartitioningAwareFileIndex = {
+    PaimonFormatTable.createFileIndex(
+      options,
+      sparkSession,
+      paths,
+      userSpecifiedSchema,
+      partitionSchema)
+  }
+}
+
+class PartitionedOrcTable(
+    name: String,
+    sparkSession: SparkSession,
+    options: CaseInsensitiveStringMap,
+    paths: Seq[String],
+    userSpecifiedSchema: Option[StructType],
+    fallbackFileFormat: Class[_ <: FileFormat],
+    partitionSchema: StructType
+) extends OrcTable(name, sparkSession, options, paths, userSpecifiedSchema, 
fallbackFileFormat) {
+
+  override lazy val fileIndex: PartitioningAwareFileIndex = {
+    PaimonFormatTable.createFileIndex(
+      options,
+      sparkSession,
+      paths,
+      userSpecifiedSchema,
+      partitionSchema)
+  }
+}
+
+class PartitionedParquetTable(
+    name: String,
+    sparkSession: SparkSession,
+    options: CaseInsensitiveStringMap,
+    paths: Seq[String],
+    userSpecifiedSchema: Option[StructType],
+    fallbackFileFormat: Class[_ <: FileFormat],
+    partitionSchema: StructType
+) extends ParquetTable(
+    name,
+    sparkSession,
+    options,
+    paths,
+    userSpecifiedSchema,
+    fallbackFileFormat) {
+
+  override lazy val fileIndex: PartitioningAwareFileIndex = {
+    PaimonFormatTable.createFileIndex(
+      options,
+      sparkSession,
+      paths,
+      userSpecifiedSchema,
+      partitionSchema)
+  }
+}
+
+class PartitionedJsonTable(
+    name: String,
+    sparkSession: SparkSession,
+    options: CaseInsensitiveStringMap,
+    paths: Seq[String],
+    userSpecifiedSchema: Option[StructType],
+    fallbackFileFormat: Class[_ <: FileFormat],
+    partitionSchema: StructType)
+  extends JsonTable(name, sparkSession, options, paths, userSpecifiedSchema, 
fallbackFileFormat) {
+
+  override lazy val fileIndex: PartitioningAwareFileIndex = {
+    PaimonFormatTable.createFileIndex(
+      options,
+      sparkSession,
+      paths,
+      userSpecifiedSchema,
+      partitionSchema)
+  }
+}
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
index 70b2ef186e..444fb1024d 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
@@ -27,7 +27,6 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.io.TempDir;
 
@@ -174,25 +173,4 @@ public class SparkCatalogWithHiveTest {
 
         spark.close();
     }
-
-    @Disabled
-    @Test
-    public void testPartitionedFormatTable() {
-        SparkSession spark = createSessionBuilder().getOrCreate();
-        spark.sql("CREATE DATABASE IF NOT EXISTS test_db");
-        spark.sql("USE spark_catalog.test_db");
-
-        spark.sql(
-                "CREATE TABLE part_format_table (a INT, b INT, dt STRING) 
USING CSV PARTITIONED BY (dt)");
-        spark.sql("INSERT INTO TABLE part_format_table VALUES (1, 1, '1'), (2, 
2, '2')");
-        assertThat(
-                        spark.sql("SELECT * FROM 
part_format_table").collectAsList().stream()
-                                .map(Row::toString)
-                                .collect(Collectors.toList()))
-                .containsExactlyInAnyOrder("[1,1,1]", "[2,2,2]");
-
-        // TODO assert partition paths, it should be true partitioned table
-
-        spark.close();
-    }
 }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
new file mode 100644
index 0000000000..6bd54a26e3
--- /dev/null
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.catalog.Identifier
+import org.apache.paimon.fs.Path
+import org.apache.paimon.spark.PaimonHiveTestBase
+import org.apache.paimon.table.FormatTable
+
+import org.apache.spark.sql.Row
+
+abstract class FormatTableTestBase extends PaimonHiveTestBase {
+
+  override protected def beforeEach(): Unit = {
+    sql(s"USE $paimonHiveCatalogName")
+  }
+
+  test("Format table: write partitioned table") {
+    for (format <- Seq("csv", "orc", "parquet", "json")) {
+      withTable("t") {
+        sql(s"CREATE TABLE t (id INT, p1 INT, p2 INT) USING $format 
PARTITIONED BY (p1, p2)")
+        sql("INSERT INTO t VALUES (1, 2, 3)")
+
+        // check show create table
+        assert(
+          sql("SHOW CREATE TABLE 
t").collectAsList().toString.contains("PARTITIONED BY (p1, p2)"))
+
+        // check partition in file system
+        val table =
+          paimonCatalog.getTable(Identifier.create("default", 
"t")).asInstanceOf[FormatTable]
+        val dirs = table.fileIO().listStatus(new 
Path(table.location())).map(_.getPath.getName)
+        assert(dirs.count(_.startsWith("p1=")) == 1)
+
+        // check select
+        checkAnswer(sql("SELECT * FROM t"), Row(1, 2, 3))
+        checkAnswer(sql("SELECT id FROM t"), Row(1))
+        checkAnswer(sql("SELECT p1 FROM t"), Row(2))
+        checkAnswer(sql("SELECT p2 FROM t"), Row(3))
+      }
+    }
+  }
+}

Reply via email to