This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new d9a7451d5c [spark] Support partitioned format table (#5502)
d9a7451d5c is described below
commit d9a7451d5caae9d41242ea88af2bfbaaf190a9f8
Author: Zouxxyy <[email protected]>
AuthorDate: Mon Apr 21 21:05:29 2025 +0800
[spark] Support partitioned format table (#5502)
---
.../java/org/apache/paimon/hive/HiveCatalog.java | 7 +-
.../hive/Hive23CatalogFormatTableITCase.java | 6 +
.../apache/paimon/spark/sql/FormatTableTest.scala | 21 +++
.../apache/paimon/spark/sql/FormatTableTest.scala | 21 +++
.../apache/paimon/spark/sql/FormatTableTest.scala | 21 +++
.../apache/paimon/spark/sql/FormatTableTest.scala | 21 +++
.../apache/paimon/spark/sql/FormatTableTest.scala | 21 +++
.../java/org/apache/paimon/spark/SparkCatalog.java | 32 ++--
.../spark/sql/execution/PaimonFormatTable.scala | 196 +++++++++++++++++++++
.../paimon/spark/SparkCatalogWithHiveTest.java | 22 ---
.../paimon/spark/sql/FormatTableTestBase.scala | 58 ++++++
11 files changed, 391 insertions(+), 35 deletions(-)
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index 164b27db56..1ef319fb31 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -1467,7 +1467,12 @@ public class HiveCatalog extends AbstractCatalog {
sd.setSerdeInfo(serDeInfo);
CoreOptions options = new CoreOptions(schema.options());
- if (options.partitionedTableInMetastore() &&
!schema.partitionKeys().isEmpty()) {
+ boolean partitioned = !schema.partitionKeys().isEmpty();
+ // Always treat partitioned format table as partitioned table in
metastore
+ if (provider == null && !options.partitionedTableInMetastore()) {
+ partitioned = false;
+ }
+ if (partitioned) {
Map<String, DataField> fieldMap =
schema.fields().stream()
.collect(Collectors.toMap(DataField::name,
Function.identity()));
diff --git
a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogFormatTableITCase.java
b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogFormatTableITCase.java
index d2e277dd22..0e77a6b67e 100644
---
a/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogFormatTableITCase.java
+++
b/paimon-hive/paimon-hive-connector-2.3/src/test/java/org/apache/paimon/hive/Hive23CatalogFormatTableITCase.java
@@ -49,4 +49,10 @@ public class Hive23CatalogFormatTableITCase extends
HiveCatalogFormatTableITCase
public void testPartitionTable() {
// Need to specify partition columns because the destination table is
partitioned.
}
+
+ @Override
+ @Test
+ public void testFlinkCreatePartitionTable() {
+ // Need to specify partition columns because the destination table is
partitioned.
+ }
}
diff --git
a/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
new file mode 100644
index 0000000000..ba49976ab6
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.2/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class FormatTableTest extends FormatTableTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
new file mode 100644
index 0000000000..ba49976ab6
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.3/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class FormatTableTest extends FormatTableTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
new file mode 100644
index 0000000000..ba49976ab6
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.4/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class FormatTableTest extends FormatTableTestBase {}
diff --git
a/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
new file mode 100644
index 0000000000..ba49976ab6
--- /dev/null
+++
b/paimon-spark/paimon-spark-3.5/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class FormatTableTest extends FormatTableTestBase {}
diff --git
a/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
new file mode 100644
index 0000000000..ba49976ab6
--- /dev/null
+++
b/paimon-spark/paimon-spark-4.0/src/test/scala/org/apache/paimon/spark/sql/FormatTableTest.scala
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+class FormatTableTest extends FormatTableTestBase {}
diff --git
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index f6e29d7651..c323d86dfd 100644
---
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -31,6 +31,7 @@ import org.apache.paimon.spark.catalog.SupportFunction;
import org.apache.paimon.spark.catalog.SupportView;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.FormatTableOptions;
+import org.apache.paimon.utils.TypeUtils;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalyst.analysis.NamespaceAlreadyExistsException;
@@ -45,15 +46,15 @@ import
org.apache.spark.sql.connector.expressions.FieldReference;
import org.apache.spark.sql.connector.expressions.IdentityTransform;
import org.apache.spark.sql.connector.expressions.NamedReference;
import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.execution.PartitionedCSVTable;
+import org.apache.spark.sql.execution.PartitionedJsonTable;
+import org.apache.spark.sql.execution.PartitionedOrcTable;
+import org.apache.spark.sql.execution.PartitionedParquetTable;
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat;
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat;
import org.apache.spark.sql.execution.datasources.orc.OrcFileFormat;
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat;
import org.apache.spark.sql.execution.datasources.v2.FileTable;
-import org.apache.spark.sql.execution.datasources.v2.csv.CSVTable;
-import org.apache.spark.sql.execution.datasources.v2.json.JsonTable;
-import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable;
-import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -464,6 +465,9 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction, S
private static FileTable convertToFileTable(Identifier ident, FormatTable
formatTable) {
StructType schema =
SparkTypeUtils.fromPaimonRowType(formatTable.rowType());
+ StructType partitionSchema =
+ SparkTypeUtils.fromPaimonRowType(
+ TypeUtils.project(formatTable.rowType(),
formatTable.partitionKeys()));
List<String> pathList = new ArrayList<>();
pathList.add(formatTable.location());
Options options = Options.fromMap(formatTable.options());
@@ -471,37 +475,41 @@ public class SparkCatalog extends SparkBaseCatalog
implements SupportFunction, S
if (formatTable.format() == FormatTable.Format.CSV) {
options.set("sep",
options.get(FormatTableOptions.FIELD_DELIMITER));
dsOptions = new CaseInsensitiveStringMap(options.toMap());
- return new CSVTable(
+ return new PartitionedCSVTable(
ident.name(),
SparkSession.active(),
dsOptions,
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
scala.Option.apply(schema),
- CSVFileFormat.class);
+ CSVFileFormat.class,
+ partitionSchema);
} else if (formatTable.format() == FormatTable.Format.ORC) {
- return new OrcTable(
+ return new PartitionedOrcTable(
ident.name(),
SparkSession.active(),
dsOptions,
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
scala.Option.apply(schema),
- OrcFileFormat.class);
+ OrcFileFormat.class,
+ partitionSchema);
} else if (formatTable.format() == FormatTable.Format.PARQUET) {
- return new ParquetTable(
+ return new PartitionedParquetTable(
ident.name(),
SparkSession.active(),
dsOptions,
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
scala.Option.apply(schema),
- ParquetFileFormat.class);
+ ParquetFileFormat.class,
+ partitionSchema);
} else if (formatTable.format() == FormatTable.Format.JSON) {
- return new JsonTable(
+ return new PartitionedJsonTable(
ident.name(),
SparkSession.active(),
dsOptions,
scala.collection.JavaConverters.asScalaBuffer(pathList).toSeq(),
scala.Option.apply(schema),
- JsonFileFormat.class);
+ JsonFileFormat.class,
+ partitionSchema);
} else {
throw new UnsupportedOperationException(
"Unsupported format table "
diff --git
a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
new file mode 100644
index 0000000000..842009b4a5
--- /dev/null
+++
b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/spark/sql/execution/PaimonFormatTable.scala
@@ -0,0 +1,196 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution
+
+import org.apache.hadoop.fs.Path
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.execution.datasources._
+import org.apache.spark.sql.execution.datasources.v2.csv.CSVTable
+import org.apache.spark.sql.execution.datasources.v2.json.JsonTable
+import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable
+import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable
+import org.apache.spark.sql.execution.streaming.{FileStreamSink,
MetadataLogFileIndex}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
+
+import scala.collection.JavaConverters._
+
+object PaimonFormatTable {
+
+ // Copy from spark and override FileIndex's partitionSchema
+ def createFileIndex(
+ options: CaseInsensitiveStringMap,
+ sparkSession: SparkSession,
+ paths: Seq[String],
+ userSpecifiedSchema: Option[StructType],
+ partitionSchema: StructType): PartitioningAwareFileIndex = {
+
+ def globPaths: Boolean = {
+ val entry = options.get(DataSource.GLOB_PATHS_KEY)
+ Option(entry).map(_ == "true").getOrElse(true)
+ }
+
+ val caseSensitiveMap = options.asCaseSensitiveMap.asScala.toMap
+ // Hadoop Configurations are case-sensitive.
+ val hadoopConf =
sparkSession.sessionState.newHadoopConfWithOptions(caseSensitiveMap)
+ if (FileStreamSink.hasMetadata(paths, hadoopConf,
sparkSession.sessionState.conf)) {
+ // We are reading from the results of a streaming query. We will load
files from
+ // the metadata log instead of listing them using HDFS APIs.
+ new PartitionedMetadataLogFileIndex(
+ sparkSession,
+ new Path(paths.head),
+ options.asScala.toMap,
+ userSpecifiedSchema,
+ partitionSchema = partitionSchema)
+ } else {
+ // This is a non-streaming file based datasource.
+ val rootPathsSpecified = DataSource.checkAndGlobPathIfNecessary(
+ paths,
+ hadoopConf,
+ checkEmptyGlobPath = true,
+ checkFilesExist = true,
+ enableGlobbing = globPaths)
+ val fileStatusCache = FileStatusCache.getOrCreate(sparkSession)
+
+ new PartitionedInMemoryFileIndex(
+ sparkSession,
+ rootPathsSpecified,
+ caseSensitiveMap,
+ userSpecifiedSchema,
+ fileStatusCache,
+ partitionSchema = partitionSchema)
+ }
+ }
+
+ // Extend from MetadataLogFileIndex to override partitionSchema
+ private class PartitionedMetadataLogFileIndex(
+ sparkSession: SparkSession,
+ path: Path,
+ parameters: Map[String, String],
+ userSpecifiedSchema: Option[StructType],
+ override val partitionSchema: StructType)
+ extends MetadataLogFileIndex(sparkSession, path, parameters,
userSpecifiedSchema)
+
+ // Extend from InMemoryFileIndex to override partitionSchema
+ private class PartitionedInMemoryFileIndex(
+ sparkSession: SparkSession,
+ rootPathsSpecified: Seq[Path],
+ parameters: Map[String, String],
+ userSpecifiedSchema: Option[StructType],
+ fileStatusCache: FileStatusCache = NoopCache,
+ userSpecifiedPartitionSpec: Option[PartitionSpec] = None,
+ metadataOpsTimeNs: Option[Long] = None,
+ override val partitionSchema: StructType)
+ extends InMemoryFileIndex(
+ sparkSession,
+ rootPathsSpecified,
+ parameters,
+ userSpecifiedSchema,
+ fileStatusCache,
+ userSpecifiedPartitionSpec,
+ metadataOpsTimeNs)
+}
+
+// Paimon Format Table
+
+class PartitionedCSVTable(
+ name: String,
+ sparkSession: SparkSession,
+ options: CaseInsensitiveStringMap,
+ paths: Seq[String],
+ userSpecifiedSchema: Option[StructType],
+ fallbackFileFormat: Class[_ <: FileFormat],
+ partitionSchema: StructType
+) extends CSVTable(name, sparkSession, options, paths, userSpecifiedSchema,
fallbackFileFormat) {
+
+ override lazy val fileIndex: PartitioningAwareFileIndex = {
+ PaimonFormatTable.createFileIndex(
+ options,
+ sparkSession,
+ paths,
+ userSpecifiedSchema,
+ partitionSchema)
+ }
+}
+
+class PartitionedOrcTable(
+ name: String,
+ sparkSession: SparkSession,
+ options: CaseInsensitiveStringMap,
+ paths: Seq[String],
+ userSpecifiedSchema: Option[StructType],
+ fallbackFileFormat: Class[_ <: FileFormat],
+ partitionSchema: StructType
+) extends OrcTable(name, sparkSession, options, paths, userSpecifiedSchema,
fallbackFileFormat) {
+
+ override lazy val fileIndex: PartitioningAwareFileIndex = {
+ PaimonFormatTable.createFileIndex(
+ options,
+ sparkSession,
+ paths,
+ userSpecifiedSchema,
+ partitionSchema)
+ }
+}
+
+class PartitionedParquetTable(
+ name: String,
+ sparkSession: SparkSession,
+ options: CaseInsensitiveStringMap,
+ paths: Seq[String],
+ userSpecifiedSchema: Option[StructType],
+ fallbackFileFormat: Class[_ <: FileFormat],
+ partitionSchema: StructType
+) extends ParquetTable(
+ name,
+ sparkSession,
+ options,
+ paths,
+ userSpecifiedSchema,
+ fallbackFileFormat) {
+
+ override lazy val fileIndex: PartitioningAwareFileIndex = {
+ PaimonFormatTable.createFileIndex(
+ options,
+ sparkSession,
+ paths,
+ userSpecifiedSchema,
+ partitionSchema)
+ }
+}
+
+class PartitionedJsonTable(
+ name: String,
+ sparkSession: SparkSession,
+ options: CaseInsensitiveStringMap,
+ paths: Seq[String],
+ userSpecifiedSchema: Option[StructType],
+ fallbackFileFormat: Class[_ <: FileFormat],
+ partitionSchema: StructType)
+ extends JsonTable(name, sparkSession, options, paths, userSpecifiedSchema,
fallbackFileFormat) {
+
+ override lazy val fileIndex: PartitioningAwareFileIndex = {
+ PaimonFormatTable.createFileIndex(
+ options,
+ sparkSession,
+ paths,
+ userSpecifiedSchema,
+ partitionSchema)
+ }
+}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
index 70b2ef186e..444fb1024d 100644
---
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
+++
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
@@ -27,7 +27,6 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
@@ -174,25 +173,4 @@ public class SparkCatalogWithHiveTest {
spark.close();
}
-
- @Disabled
- @Test
- public void testPartitionedFormatTable() {
- SparkSession spark = createSessionBuilder().getOrCreate();
- spark.sql("CREATE DATABASE IF NOT EXISTS test_db");
- spark.sql("USE spark_catalog.test_db");
-
- spark.sql(
- "CREATE TABLE part_format_table (a INT, b INT, dt STRING)
USING CSV PARTITIONED BY (dt)");
- spark.sql("INSERT INTO TABLE part_format_table VALUES (1, 1, '1'), (2,
2, '2')");
- assertThat(
- spark.sql("SELECT * FROM
part_format_table").collectAsList().stream()
- .map(Row::toString)
- .collect(Collectors.toList()))
- .containsExactlyInAnyOrder("[1,1,1]", "[2,2,2]");
-
- // TODO assert partition paths, it should be true partitioned table
-
- spark.close();
- }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
new file mode 100644
index 0000000000..6bd54a26e3
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/FormatTableTestBase.scala
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.sql
+
+import org.apache.paimon.catalog.Identifier
+import org.apache.paimon.fs.Path
+import org.apache.paimon.spark.PaimonHiveTestBase
+import org.apache.paimon.table.FormatTable
+
+import org.apache.spark.sql.Row
+
+abstract class FormatTableTestBase extends PaimonHiveTestBase {
+
+ override protected def beforeEach(): Unit = {
+ sql(s"USE $paimonHiveCatalogName")
+ }
+
+ test("Format table: write partitioned table") {
+ for (format <- Seq("csv", "orc", "parquet", "json")) {
+ withTable("t") {
+ sql(s"CREATE TABLE t (id INT, p1 INT, p2 INT) USING $format
PARTITIONED BY (p1, p2)")
+ sql("INSERT INTO t VALUES (1, 2, 3)")
+
+ // check show create table
+ assert(
+ sql("SHOW CREATE TABLE
t").collectAsList().toString.contains("PARTITIONED BY (p1, p2)"))
+
+ // check partition in file system
+ val table =
+ paimonCatalog.getTable(Identifier.create("default",
"t")).asInstanceOf[FormatTable]
+ val dirs = table.fileIO().listStatus(new
Path(table.location())).map(_.getPath.getName)
+ assert(dirs.count(_.startsWith("p1=")) == 1)
+
+ // check select
+ checkAnswer(sql("SELECT * FROM t"), Row(1, 2, 3))
+ checkAnswer(sql("SELECT id FROM t"), Row(1))
+ checkAnswer(sql("SELECT p1 FROM t"), Row(2))
+ checkAnswer(sql("SELECT p2 FROM t"), Row(3))
+ }
+ }
+ }
+}