This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 0f701668ec [core][rest] Support schema validation and infer for
external paimon table (#6501)
0f701668ec is described below
commit 0f701668ecfea45c5764dc4fd008a192d06442b3
Author: XiaoHongbo <[email protected]>
AuthorDate: Mon Nov 3 18:44:45 2025 +0800
[core][rest] Support schema validation and infer for external paimon table
(#6501)
---
.../org/apache/paimon/catalog/CatalogUtils.java | 3 +
.../java/org/apache/paimon/rest/RESTCatalog.java | 34 +-
.../org/apache/paimon/rest/RESTCatalogServer.java | 88 ++--
.../org/apache/paimon/rest/RESTCatalogTest.java | 127 ++++--
.../apache/paimon/rest/RESTFileSystemCatalog.java | 13 +
.../spark/table/PaimonExternalTableTest.scala | 490 +++++++++++++++++++++
6 files changed, 682 insertions(+), 73 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index e9510e8d55..d82016f931 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -25,6 +25,7 @@ import org.apache.paimon.fs.Path;
import org.apache.paimon.manifest.PartitionEntry;
import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
+import org.apache.paimon.rest.exceptions.NotImplementedException;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
@@ -323,6 +324,8 @@ public class CatalogUtils {
snapshot = optional.get();
}
} catch (Catalog.TableNotExistException ignored) {
+ } catch (NotImplementedException ignored) {
+ // does not support supportsVersionManagement for external
paimon table
}
}
tableAndSnapshots.add(Pair.of(table, snapshot));
diff --git a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
index 0d34e17bc8..1bf8ff2f35 100644
--- a/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/rest/RESTCatalog.java
@@ -21,6 +21,7 @@ package org.apache.paimon.rest;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.PagedList;
import org.apache.paimon.Snapshot;
+import org.apache.paimon.TableType;
import org.apache.paimon.annotation.VisibleForTesting;
import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.CatalogContext;
@@ -33,6 +34,7 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.function.Function;
import org.apache.paimon.function.FunctionChange;
+import org.apache.paimon.options.Options;
import org.apache.paimon.partition.Partition;
import org.apache.paimon.partition.PartitionStatistics;
import org.apache.paimon.rest.exceptions.AlreadyExistsException;
@@ -48,6 +50,7 @@ import org.apache.paimon.rest.responses.GetTableResponse;
import org.apache.paimon.rest.responses.GetViewResponse;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.Instant;
import org.apache.paimon.table.Table;
@@ -71,12 +74,14 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import static org.apache.paimon.CoreOptions.BRANCH;
import static org.apache.paimon.CoreOptions.PATH;
+import static org.apache.paimon.CoreOptions.TYPE;
import static org.apache.paimon.catalog.CatalogUtils.checkNotBranch;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemDatabase;
import static org.apache.paimon.catalog.CatalogUtils.checkNotSystemTable;
@@ -443,7 +448,8 @@ public class RESTCatalog implements Catalog {
checkNotSystemTable(identifier, "createTable");
validateCreateTable(schema);
createExternalTablePathIfNotExist(schema);
- api.createTable(identifier, schema);
+ Schema newSchema = inferSchemaIfExternalPaimonTable(schema);
+ api.createTable(identifier, newSchema);
} catch (AlreadyExistsException e) {
if (!ignoreIfExists) {
throw new TableAlreadyExistException(identifier);
@@ -998,4 +1004,30 @@ public class RESTCatalog implements Catalog {
}
}
}
+
+ private Schema inferSchemaIfExternalPaimonTable(Schema schema) throws
Exception {
+ TableType tableType = Options.fromMap(schema.options()).get(TYPE);
+ String externalLocation = schema.options().get(PATH.key());
+
+ if (TableType.TABLE.equals(tableType) &&
Objects.nonNull(externalLocation)) {
+ Path externalPath = new Path(externalLocation);
+ SchemaManager schemaManager =
+ new SchemaManager(fileIOFromOptions(externalPath),
externalPath);
+ Optional<TableSchema> latest = schemaManager.latest();
+ if (latest.isPresent()) {
+ // Note we just validate schema here, will not create a new
table
+ schemaManager.createTable(schema, true);
+ Schema existsSchema = latest.get().toSchema();
+ // use `owner` and `path` from the user provide schema
+ if (Objects.nonNull(schema.options().get(Catalog.OWNER_PROP)))
{
+ existsSchema
+ .options()
+ .put(Catalog.OWNER_PROP,
schema.options().get(Catalog.OWNER_PROP));
+ }
+ existsSchema.options().put(PATH.key(),
schema.options().get(PATH.key()));
+ return existsSchema;
+ }
+ }
+ return schema;
+ }
}
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
index 51d1218b2e..08fc1c9778 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogServer.java
@@ -683,6 +683,19 @@ public class RESTCatalogServer {
}
private MockResponse snapshotHandle(Identifier identifier) throws
Exception {
+ if (!tableMetadataStore.containsKey(identifier.getFullName())) {
+ throw new Catalog.TableNotExistException(identifier);
+ }
+ TableMetadata tableMetadata =
tableMetadataStore.get(identifier.getFullName());
+ if (tableMetadata.isExternal()) {
+ ErrorResponse response =
+ new ErrorResponse(
+ ErrorResponse.RESOURCE_TYPE_TABLE,
+ identifier.getFullName(),
+ "external paimon table does not support get table
snapshot in rest server",
+ 501);
+ return mockResponse(response, 404);
+ }
RESTResponse response;
Optional<TableSnapshot> snapshotOptional =
Optional.ofNullable(tableLatestSnapshotStore.get(identifier.getFullName()));
@@ -714,6 +727,7 @@ public class RESTCatalogServer {
}
private MockResponse loadSnapshot(Identifier identifier, String version)
throws Exception {
+
FileStoreTable table = (FileStoreTable) catalog.getTable(identifier);
SnapshotManager snapshotManager = table.snapshotManager();
Snapshot snapshot = null;
@@ -1279,13 +1293,16 @@ public class RESTCatalogServer {
tableMetadata = createObjectTable(identifier, schema);
} else {
catalog.createTable(identifier, schema, false);
+ boolean isExternal =
+ schema.options() != null
+ &&
schema.options().containsKey(PATH.key());
tableMetadata =
createTableMetadata(
requestBody.getIdentifier(),
0L,
requestBody.getSchema(),
UUID.randomUUID().toString(),
- false);
+ isExternal);
}
tableMetadataStore.put(
requestBody.getIdentifier().getFullName(),
tableMetadata);
@@ -1510,10 +1527,16 @@ public class RESTCatalogServer {
alterTableImpl(identifier, requestBody.getChanges());
return new MockResponse().setResponseCode(200);
case "DELETE":
- try {
- catalog.dropTable(identifier, false);
- } catch (Exception e) {
- System.out.println(e.getMessage());
+ if (!tableMetadataStore.containsKey(identifier.getFullName()))
{
+ return new MockResponse().setResponseCode(404);
+ }
+ tableMetadata =
tableMetadataStore.get(identifier.getFullName());
+ if (!tableMetadata.isExternal()) {
+ try {
+ catalog.dropTable(identifier, false);
+ } catch (Exception e) {
+ System.out.println(e.getMessage());
+ }
}
tableMetadataStore.remove(identifier.getFullName());
tableLatestSnapshotStore.remove(identifier.getFullName());
@@ -1532,7 +1555,7 @@ public class RESTCatalogServer {
throw new Catalog.TableNoPermissionException(fromTable);
} else if (tableMetadataStore.containsKey(fromTable.getFullName())) {
TableMetadata tableMetadata =
tableMetadataStore.get(fromTable.getFullName());
- if (!isFormatTable(tableMetadata.schema().toSchema())) {
+ if (!isFormatTable(tableMetadata.schema().toSchema()) &&
!tableMetadata.isExternal()) {
catalog.renameTable(requestBody.getSource(),
requestBody.getDestination(), false);
}
if (tableMetadataStore.containsKey(toTable.getFullName())) {
@@ -2066,6 +2089,17 @@ public class RESTCatalogServer {
Snapshot snapshot,
List<PartitionStatistics> statistics)
throws Catalog.TableNotExistException {
+ if (!tableMetadataStore.containsKey(identifier.getFullName())) {
+ throw new Catalog.TableNotExistException(identifier);
+ }
+ boolean isExternal =
tableMetadataStore.get(identifier.getFullName()).isExternal();
+ if (isExternal) {
+ new ErrorResponse(
+ ErrorResponse.RESOURCE_TYPE_TABLE,
+ identifier.getFullName(),
+ "external paimon table does not support commit in rest
server",
+ 501);
+ }
FileStoreTable table = getFileTable(identifier);
if (!tableId.equals(table.catalogEnvironment().uuid())) {
throw new Catalog.TableNotExistException(identifier);
@@ -2223,7 +2257,10 @@ public class RESTCatalogServer {
private TableMetadata createTableMetadata(
Identifier identifier, long schemaId, Schema schema, String uuid,
boolean isExternal) {
Map<String, String> options = new HashMap<>(schema.options());
- Path path = catalog.getTableLocation(identifier);
+ Path path =
+ isExternal && Objects.nonNull(schema.options().get(PATH.key()))
+ ? new Path(schema.options().get(PATH.key()))
+ : catalog.getTableLocation(identifier);
String restPath = path.toString();
if (this.configResponse
.getDefaults()
@@ -2261,27 +2298,22 @@ public class RESTCatalogServer {
return createTableMetadata(identifier, 1L, newSchema,
UUID.randomUUID().toString(), false);
}
- private FileStoreTable getFileTable(Identifier identifier)
- throws Catalog.TableNotExistException {
- if (tableMetadataStore.containsKey(identifier.getFullName())) {
- TableMetadata tableMetadata =
tableMetadataStore.get(identifier.getFullName());
- TableSchema schema = tableMetadata.schema();
- CatalogEnvironment catalogEnv =
- new CatalogEnvironment(
- identifier,
- tableMetadata.uuid(),
- catalog.catalogLoader(),
- catalog.lockFactory().orElse(null),
- catalog.lockContext().orElse(null),
- catalogContext,
- false);
- Path path = new Path(schema.options().get(PATH.key()));
- FileIO dataFileIO = catalog.fileIO();
- FileStoreTable table =
- FileStoreTableFactory.create(dataFileIO, path, schema,
catalogEnv);
- return table;
- }
- throw new Catalog.TableNotExistException(identifier);
+ private FileStoreTable getFileTable(Identifier identifier) {
+ TableMetadata tableMetadata =
tableMetadataStore.get(identifier.getFullName());
+ TableSchema schema = tableMetadata.schema();
+ CatalogEnvironment catalogEnv =
+ new CatalogEnvironment(
+ identifier,
+ tableMetadata.uuid(),
+ catalog.catalogLoader(),
+ catalog.lockFactory().orElse(null),
+ catalog.lockContext().orElse(null),
+ catalogContext,
+ false);
+ Path path = new Path(schema.options().get(PATH.key()));
+ FileIO dataFileIO = catalog.fileIO();
+ FileStoreTable table = FileStoreTableFactory.create(dataFileIO, path,
schema, catalogEnv);
+ return table;
}
private static int getMaxResults(Map<String, String> parameters) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
index 5d76946023..2a7c1945bf 100644
--- a/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/rest/RESTCatalogTest.java
@@ -18,6 +18,7 @@
package org.apache.paimon.rest;
+import org.apache.paimon.CoreOptions;
import org.apache.paimon.PagedList;
import org.apache.paimon.Snapshot;
import org.apache.paimon.TableType;
@@ -2686,66 +2687,102 @@ public abstract class RESTCatalogTest extends
CatalogTestBase {
Table tableAgain = catalog.getTable(identifier);
assertThat(tableAgain).isNotNull();
assertThat(tableAgain.comment()).isEqualTo(Optional.of("External table
for testing"));
+ }
+
+ @Test
+ public void testCreateExternalTableWithSchemaInference(@TempDir
java.nio.file.Path path)
+ throws Exception {
+ Path externalTablePath = new Path(path.toString(),
"external_table_inference_location");
+ DEFAULT_TABLE_SCHEMA.options().put(CoreOptions.PATH.key(),
externalTablePath.toString());
+ restCatalog.createDatabase("test_schema_inference_db", true);
+ Identifier identifier =
+ Identifier.create("test_schema_inference_db",
"external_inference_table");
+ try {
+ catalog.dropTable(identifier, true);
+ } catch (Exception e) {
+ // Ignore drop errors
+ }
+
+ createExternalTableDirectory(externalTablePath, DEFAULT_TABLE_SCHEMA);
+ Schema emptySchema =
+ new Schema(
+ Lists.newArrayList(),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ DEFAULT_TABLE_SCHEMA.options(),
+ "");
+ catalog.createTable(identifier, emptySchema, false);
- testReadSystemTables();
+ Table table = catalog.getTable(identifier);
+ assertThat(table).isNotNull();
+ assertThat(table.rowType().getFieldCount()).isEqualTo(3);
+ assertThat(table.rowType().getFieldNames()).containsExactly("pk",
"col1", "col2");
- // Verify external table path still exists after operations
- assertTrue(
- fileIO.exists(externalTablePath),
- "External table path should still exist after operations");
+ Schema clientProvidedSchema =
+ new Schema(
+ Lists.newArrayList(
+ new DataField(0, "pk", DataTypes.INT()),
+ new DataField(1, "col1", DataTypes.STRING())),
+ Collections.emptyList(),
+ Collections.emptyList(),
+ DEFAULT_TABLE_SCHEMA.options(),
+ "");
+ // schema mismatch should throw an exception
+ Assertions.assertThrows(
+ RuntimeException.class,
+ () -> catalog.createTable(identifier, clientProvidedSchema,
false));
+ DEFAULT_TABLE_SCHEMA.options().remove(CoreOptions.PATH.key());
+ }
- // Test dropping external table - data should remain
- catalog.dropTable(identifier, false);
+ @Test
+ public void testReadSystemTablesWithExternalTable(@TempDir
java.nio.file.Path path)
+ throws Exception {
+ // Create an external table
+ Path externalTablePath = new Path(path.toString(),
"external_sys_table_location");
+ DEFAULT_TABLE_SCHEMA.options().put(CoreOptions.PATH.key(),
externalTablePath.toString());
- // Verify external table path still exists after drop (external table
behavior)
- assertTrue(
- fileIO.exists(externalTablePath),
- "External table path should still exist after drop");
+ restCatalog.createDatabase("test_sys_table_db", true);
+ Identifier identifier = Identifier.create("test_sys_table_db",
"external_sys_table");
- // Clean up
try {
- fileIO.deleteQuietly(externalTablePath);
+ catalog.dropTable(identifier, true);
} catch (Exception e) {
- // Ignore cleanup errors
+ // Ignore drop errors
}
- }
- private void testReadSystemTables() throws IOException,
Catalog.TableNotExistException {
+ createExternalTableDirectory(externalTablePath, DEFAULT_TABLE_SCHEMA);
+ catalog.createTable(identifier, DEFAULT_TABLE_SCHEMA, false);
+
+ // Test reading system table with external table
Identifier allTablesIdentifier = Identifier.create("sys", "tables");
Table allTablesTable = catalog.getTable(allTablesIdentifier);
+ assertThat(allTablesTable).isNotNull();
- if (allTablesTable != null) {
- ReadBuilder allTablesReadBuilder = allTablesTable.newReadBuilder();
- TableRead allTablesRead = allTablesReadBuilder.newRead();
- List<Split> allTablesSplits =
allTablesReadBuilder.newScan().plan().splits();
+ ReadBuilder readBuilder = allTablesTable.newReadBuilder();
+ TableRead read = readBuilder.newRead();
+ List<Split> splits = readBuilder.newScan().plan().splits();
- List<InternalRow> allTablesResults = new ArrayList<>();
- for (Split split : allTablesSplits) {
- try (RecordReader<InternalRow> reader =
allTablesRead.createReader(split)) {
- reader.forEachRemaining(allTablesResults::add);
- }
+ List<InternalRow> results = new ArrayList<>();
+ for (Split split : splits) {
+ try (RecordReader<InternalRow> reader = read.createReader(split)) {
+ reader.forEachRemaining(results::add);
}
+ }
- // Verify that our external table appears in ALL_TABLES
- assertThat(allTablesResults).isNotEmpty();
-
- // Find our external table in the results
- boolean foundExternalTable = false;
- for (InternalRow row : allTablesResults) {
- String tableName = row.getString(1).toString(); // table_name
column
- String databaseName = row.getString(0).toString(); //
database_name column
- if ("external_test_table".equals(tableName)
- && "test_external_table_db".equals(databaseName)) {
- foundExternalTable = true;
- // Verify table properties
- String tableType = row.getString(2).toString(); //
table_type column
- assertThat(tableType)
- .isEqualTo("table"); // External tables are still
MANAGED type
- break;
- }
+ // Verify external table appears in system table
+ assertThat(results).isNotEmpty();
+ boolean foundExternalTable = false;
+ for (InternalRow row : results) {
+ String databaseName = row.getString(0).toString();
+ String tableName = row.getString(1).toString();
+ if ("test_sys_table_db".equals(databaseName)
+ && "external_sys_table".equals(tableName)) {
+ foundExternalTable = true;
+ break;
}
- assertThat(foundExternalTable).isTrue();
}
+ assertThat(foundExternalTable).isTrue();
+ DEFAULT_TABLE_SCHEMA.options().remove(CoreOptions.PATH.key());
}
protected void createTable(
@@ -2828,7 +2865,9 @@ public abstract class RESTCatalogTest extends
CatalogTestBase {
private void createExternalTableDirectory(Path externalTablePath, Schema
schema)
throws Exception {
// Create external table directory structure
- FileIO fileIO = FileIO.get(externalTablePath,
CatalogContext.create(new Options()));
+ FileIO fileIO =
+ FileIO.get(
+ externalTablePath, CatalogContext.create(new
Options(catalog.options())));
// Create the external table directory
if (!fileIO.exists(externalTablePath)) {
diff --git
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileSystemCatalog.java
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileSystemCatalog.java
index 26a6315347..5fd3aaf748 100644
---
a/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileSystemCatalog.java
+++
b/paimon-core/src/test/java/org/apache/paimon/rest/RESTFileSystemCatalog.java
@@ -20,8 +20,12 @@ package org.apache.paimon.rest;
import org.apache.paimon.catalog.CatalogContext;
import org.apache.paimon.catalog.FileSystemCatalog;
+import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.schema.Schema;
+
+import static org.apache.paimon.CoreOptions.PATH;
/**
* A FileSystemCatalog that supports custom table paths for REST catalog
server. This allows REST
@@ -37,4 +41,13 @@ public class RESTFileSystemCatalog extends FileSystemCatalog
{
protected boolean allowCustomTablePath() {
return true;
}
+
+ @Override
+ public void createTable(Identifier identifier, Schema schema, boolean
ignoreIfExists)
+ throws TableAlreadyExistException, DatabaseNotExistException {
+ boolean isExternal = schema.options() != null &&
schema.options().containsKey(PATH.key());
+ if (!isExternal) {
+ super.createTable(identifier, schema, ignoreIfExists);
+ }
+ }
}
diff --git
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonExternalTableTest.scala
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonExternalTableTest.scala
new file mode 100644
index 0000000000..1ea9b2ee81
--- /dev/null
+++
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/table/PaimonExternalTableTest.scala
@@ -0,0 +1,490 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.spark.table
+
+import org.apache.paimon.catalog.Identifier
+import org.apache.paimon.fs.Path
+import org.apache.paimon.fs.local.LocalFileIO
+import org.apache.paimon.schema.{Schema, SchemaManager}
+import org.apache.paimon.spark.PaimonSparkTestWithRestCatalogBase
+import org.apache.paimon.table.FileStoreTable
+import org.apache.paimon.types.DataTypes
+import org.apache.paimon.utils.StringUtils
+
+import org.apache.spark.sql.Row
+
+class PaimonExternalTableTest extends PaimonSparkTestWithRestCatalogBase {
+
+ override protected def beforeEach(): Unit = {
+ super.beforeEach()
+ sql("USE paimon")
+ sql("CREATE DATABASE IF NOT EXISTS test_db")
+ sql("USE test_db")
+ // Clean up any existing tables from previous test runs
+ sql("DROP TABLE IF EXISTS external_tbl")
+ sql("DROP TABLE IF EXISTS managed_tbl")
+ sql("DROP TABLE IF EXISTS external_tbl_renamed")
+ sql("DROP TABLE IF EXISTS t1")
+ sql("DROP TABLE IF EXISTS t2")
+ }
+
+ test("PaimonExternalTable: create and drop external table") {
+ withTempDir {
+ tbLocation =>
+ withTable("external_tbl", "managed_tbl") {
+ val externalTbLocation = tbLocation.getCanonicalPath
+ // Ensure table doesn't exist before starting
+ sql("DROP TABLE IF EXISTS external_tbl")
+
+ // For REST catalog external tables, schema must be created in
filesystem first
+ val schemaTablePath = new Path(externalTbLocation)
+ val schemaFileIO = LocalFileIO.create()
+ val schema = Schema
+ .newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .option("path", externalTbLocation)
+ .option("type", "table")
+ .build()
+ new SchemaManager(schemaFileIO, schemaTablePath).createTable(schema,
true)
+
+ // create external table
+ sql(
+ s"CREATE TABLE external_tbl (id INT, name STRING) USING paimon
LOCATION '$externalTbLocation'")
+ sql("INSERT INTO external_tbl VALUES (1, 'Alice'), (2, 'Bob')")
+ checkAnswer(
+ sql("SELECT * FROM external_tbl ORDER BY id"),
+ Seq(Row(1, "Alice"), Row(2, "Bob"))
+ )
+
+ val table = paimonCatalog
+ .getTable(Identifier.create("test_db", "external_tbl"))
+ .asInstanceOf[FileStoreTable]
+ val fileIO = table.fileIO()
+ val actualTbLocation = table.location()
+
+ // For REST catalog, the path might be managed internally, but the
table should still function as external
+ // Verify that the table has a location and is accessible
+ assert(actualTbLocation != null, "External table should have a
location")
+
+ // Verify data is accessible
+ assert(fileIO.exists(actualTbLocation), "External table location
should exist")
+
+ // drop external table - data should still exist (this is the key
characteristic of external tables)
+ sql("DROP TABLE external_tbl")
+ assert(fileIO.exists(actualTbLocation), "External table data should
exist after drop")
+
+ // Invalidate catalog cache to ensure table is fully removed
+ try {
+ paimonCatalog.invalidateTable(Identifier.create("test_db",
"external_tbl"))
+ } catch {
+ case _: Exception => // Ignore if table doesn't exist in cache
+ }
+
+ // Wait a bit and ensure table is fully dropped before recreating
+ Thread.sleep(100) // Give catalog time to fully process the drop
+ sql("DROP TABLE IF EXISTS external_tbl")
+
+ // Schema already exists in filesystem from initial creation, no
need to recreate
+ // create external table again using the same location - should be
able to read existing data
+ sql(
+ s"CREATE TABLE external_tbl (id INT, name STRING) USING paimon
LOCATION '$externalTbLocation'")
+ checkAnswer(
+ sql("SELECT * FROM external_tbl ORDER BY id"),
+ Seq(Row(1, "Alice"), Row(2, "Bob"))
+ )
+
+ // create managed table for comparison
+ sql("CREATE TABLE managed_tbl (id INT, name STRING) USING paimon")
+ sql("INSERT INTO managed_tbl VALUES (3, 'Charlie')")
+ val managedTable = paimonCatalog
+ .getTable(Identifier.create("test_db", "managed_tbl"))
+ .asInstanceOf[FileStoreTable]
+ val managedTbLocation = managedTable.location()
+
+ // drop managed table - data should be deleted
+ sql("DROP TABLE managed_tbl")
+ assert(
+ !fileIO.exists(managedTbLocation),
+ "Managed table data should not exist after drop")
+ }
+ }
+ }
+
+ test("PaimonExternalTable: partitioned external table") {
+ withTempDir {
+ tbLocation =>
+ withTable("external_tbl") {
+ val externalTbLocation = tbLocation.getCanonicalPath
+
+ // For REST catalog external tables, schema must be created in
filesystem first
+ val schemaTablePath = new Path(externalTbLocation)
+ val schemaFileIO = LocalFileIO.create()
+ val schema = Schema
+ .newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("value", DataTypes.DOUBLE())
+ .column("dept", DataTypes.STRING())
+ .partitionKeys("dept")
+ .option("path", externalTbLocation)
+ .option("type", "table")
+ .build()
+ new SchemaManager(schemaFileIO, schemaTablePath).createTable(schema,
true)
+
+ sql(s"""
+ |CREATE TABLE external_tbl (id INT, name STRING, value
DOUBLE) USING paimon
+ |PARTITIONED BY (dept STRING)
+ |LOCATION '$externalTbLocation'
+ |""".stripMargin)
+
+ sql(
+ "INSERT INTO external_tbl VALUES " +
+ "(1, 'Alice', 10.5, 'Engineering')," +
+ "(2, 'Bob', 20.7, 'Engineering')," +
+ "(3, 'Charlie', 30.9, 'Sales')," +
+ "(4, 'David', 25.3, 'Sales')")
+
+ // Test reading all data
+ checkAnswer(
+ sql("SELECT * FROM external_tbl ORDER BY id"),
+ Seq(
+ Row(1, "Alice", 10.5, "Engineering"),
+ Row(2, "Bob", 20.7, "Engineering"),
+ Row(3, "Charlie", 30.9, "Sales"),
+ Row(4, "David", 25.3, "Sales")
+ )
+ )
+
+ // Test partition filtering
+ checkAnswer(
+ sql("SELECT * FROM external_tbl WHERE dept = 'Engineering' ORDER
BY id"),
+ Seq(
+ Row(1, "Alice", 10.5, "Engineering"),
+ Row(2, "Bob", 20.7, "Engineering")
+ )
+ )
+
+ // Test column projection with partition filtering
+ checkAnswer(
+ sql("SELECT name, value FROM external_tbl WHERE dept = 'Sales'
ORDER BY id"),
+ Seq(
+ Row("Charlie", 30.9),
+ Row("David", 25.3)
+ )
+ )
+
+ // Verify this is an external table - drop and check data exists
+ val table = paimonCatalog
+ .getTable(Identifier.create("test_db", "external_tbl"))
+ .asInstanceOf[FileStoreTable]
+ val fileIO = table.fileIO()
+ val actualTbLocation = table.location()
+
+ sql("DROP TABLE external_tbl")
+ assert(fileIO.exists(actualTbLocation), "External table data should
exist after drop")
+ }
+ }
+ }
+
+ test("PaimonExternalTable: rename external table") {
+ withTempDir {
+ tbLocation =>
+ withTable("external_tbl", "external_tbl_renamed") {
+ val externalTbLocation = tbLocation.getCanonicalPath
+
+ // For REST catalog external tables, schema must be created in
filesystem first
+ val schemaTablePath = new Path(externalTbLocation)
+ val schemaFileIO = LocalFileIO.create()
+ val schema = Schema
+ .newBuilder()
+ .column("id", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .option("path", externalTbLocation)
+ .option("type", "table")
+ .build()
+ new SchemaManager(schemaFileIO, schemaTablePath).createTable(schema,
true)
+
+ // create external table
+ sql(
+ s"CREATE TABLE external_tbl (id INT, name STRING) USING paimon
LOCATION '$externalTbLocation'")
+ sql("INSERT INTO external_tbl VALUES (1, 'Alice')")
+ val originalLocation = paimonCatalog
+ .getTable(Identifier.create("test_db", "external_tbl"))
+ .asInstanceOf[FileStoreTable]
+ .location()
+
+ // rename external table, location should not change
+ sql("ALTER TABLE external_tbl RENAME TO external_tbl_renamed")
+ checkAnswer(
+ sql("SELECT * FROM external_tbl_renamed"),
+ Seq(Row(1, "Alice"))
+ )
+
+ val renamedTable = paimonCatalog
+ .getTable(Identifier.create("test_db", "external_tbl_renamed"))
+ .asInstanceOf[FileStoreTable]
+ val renamedLocation = renamedTable.location()
+ assert(
+ renamedLocation.toString.equals(originalLocation.toString),
+ "External table location should not change after rename"
+ )
+ }
+ }
+ }
+
+ test("PaimonExternalTable: create external table without schema") {
+ withTempDir {
+ tbLocation =>
+ withTable("t1", "t2") {
+ val externalTbLocation = tbLocation.getCanonicalPath
+
+ // For REST catalog external tables, schema must be created in
filesystem first
+ val schemaTablePath = new Path(externalTbLocation)
+ val schemaFileIO = LocalFileIO.create()
+ val schema = Schema
+ .newBuilder()
+ .column("id", DataTypes.INT())
+ .column("pt", DataTypes.INT())
+ .partitionKeys("pt")
+ .primaryKey("id")
+ .option("path", externalTbLocation)
+ .option("type", "table")
+ .build()
+ new SchemaManager(schemaFileIO, schemaTablePath).createTable(schema,
true)
+
+ // First create a table with schema and data
+ sql(s"""
+ |CREATE TABLE t1 (id INT, pt INT) USING paimon
+ |PARTITIONED BY (pt)
+ |TBLPROPERTIES('primary-key' = 'id')
+ |LOCATION '$externalTbLocation'
+ |""".stripMargin)
+ sql("INSERT INTO t1 VALUES (1, 1), (2, 2)")
+
+ // create external table without schema - should infer from existing
table
+ sql(s"CREATE TABLE t2 USING paimon LOCATION '$externalTbLocation'")
+ checkAnswer(
+ sql("SELECT * FROM t2 ORDER BY id"),
+ Seq(Row(1, 1), Row(2, 2))
+ )
+
+ val table2 =
+ paimonCatalog.getTable(Identifier.create("test_db",
"t2")).asInstanceOf[FileStoreTable]
+ val table2Location = table2.location()
+ // Verify table2 can access the data from table1's location
+ assert(table2Location != null, "Table t2 should have a location")
+ // The key point is that t2 can read data from the same location as
t1
+ assert(table2.fileIO().exists(table2Location), "Table t2 location
should exist")
+ }
+ }
+ }
+
+ test("PaimonExternalTable: create external table on managed table location")
{
+ withTable("external_tbl", "managed_tbl") {
+ // Create managed table first
+ sql("CREATE TABLE managed_tbl (id INT, name STRING) USING paimon")
+ sql("INSERT INTO managed_tbl VALUES (1, 'Alice'), (2, 'Bob')")
+ checkAnswer(
+ sql("SELECT * FROM managed_tbl ORDER BY id"),
+ Seq(Row(1, "Alice"), Row(2, "Bob"))
+ )
+
+ val managedTable = paimonCatalog
+ .getTable(Identifier.create("test_db", "managed_tbl"))
+ .asInstanceOf[FileStoreTable]
+ val managedLocation = managedTable.location()
+ // Extract actual file system path, removing any scheme prefix (e.g.,
"file:", "rest:")
+ val tablePath = if (managedLocation.toString.contains(":")) {
+ // Path has scheme, extract the path part after the first ":"
+ val parts = managedLocation.toString.split(":", 2)
+ if (parts.length == 2 && parts(0).equals("file")) {
+ parts(1) // For file: scheme, use the path directly
+ } else {
+ // For other schemes or if parsing fails, try to get canonical path
+ try {
+ new java.io.File(managedLocation.toString.replaceFirst("^[^:]+:",
"")).getCanonicalPath
+ } catch {
+ case _: Exception =>
managedLocation.toString.replaceFirst("^[^:]+:", "")
+ }
+ }
+ } else {
+ managedLocation.toString
+ }
+
+ // For REST catalog, managed table already has schema, no need to create
schema again
+ // Create external table pointing to managed table location
+ sql(s"CREATE TABLE external_tbl (id INT, name STRING) USING paimon
LOCATION '$tablePath'")
+ checkAnswer(
+ sql("SELECT * FROM external_tbl ORDER BY id"),
+ Seq(Row(1, "Alice"), Row(2, "Bob"))
+ )
+
+ val externalTable = paimonCatalog
+ .getTable(Identifier.create("test_db", "external_tbl"))
+ .asInstanceOf[FileStoreTable]
+ assert(
+ StringUtils.replace(externalTable.location().toString, "file:",
"").equals(tablePath),
+ "External table should point to managed table location"
+ )
+
+ // Drop managed table - managed table deletion will delete data files
+ // since external table points to the same location, data will be deleted
+ sql("DROP TABLE managed_tbl")
+ val fileIO = externalTable.fileIO()
+ assert(
+ !fileIO.exists(externalTable.location()),
+ "Data should be deleted after dropping managed table since external
table points to managed table location"
+ )
+
+ // External table cannot read data anymore since data was deleted with
managed table
+ // This demonstrates that external table pointing to managed table
location shares the same data
+ checkAnswer(
+ sql("SELECT * FROM external_tbl ORDER BY id"),
+ Seq.empty
+ )
+ }
+ }
+
+ test("PaimonExternalTable: insert overwrite on external table") {
+ withTempDir {
+ tbLocation =>
+ withTable("external_tbl") {
+ val externalTbLocation = tbLocation.getCanonicalPath
+
+ // For REST catalog external tables, schema must be created in
filesystem first
+ val schemaTablePath = new Path(externalTbLocation)
+ val schemaFileIO = LocalFileIO.create()
+ val schema = Schema
+ .newBuilder()
+ .column("age", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .option("path", externalTbLocation)
+ .option("type", "table")
+ .build()
+ new SchemaManager(schemaFileIO, schemaTablePath).createTable(schema,
true)
+
+ sql(
+ s"CREATE TABLE external_tbl (age INT, name STRING) USING paimon
LOCATION '$externalTbLocation'")
+
+ sql("INSERT INTO external_tbl VALUES (5, 'Ben'), (7, 'Larry')")
+ checkAnswer(
+ sql("SELECT age, name FROM external_tbl ORDER BY age"),
+ Seq(Row(5, "Ben"), Row(7, "Larry"))
+ )
+
+ sql("INSERT OVERWRITE external_tbl VALUES (5, 'Jerry'), (7, 'Tom')")
+ checkAnswer(
+ sql("SELECT age, name FROM external_tbl ORDER BY age"),
+ Seq(Row(5, "Jerry"), Row(7, "Tom"))
+ )
+ }
+ }
+ }
+
+ test("PaimonExternalTable: insert overwrite on partitioned external table") {
+ withTempDir {
+ tbLocation =>
+ withTable("external_tbl") {
+ val externalTbLocation = tbLocation.getCanonicalPath
+
+ // For REST catalog external tables, schema must be created in
filesystem first
+ val schemaTablePath = new Path(externalTbLocation)
+ val schemaFileIO = LocalFileIO.create()
+ val schema = Schema
+ .newBuilder()
+ .column("age", DataTypes.INT())
+ .column("name", DataTypes.STRING())
+ .column("id", DataTypes.INT())
+ .partitionKeys("id")
+ .option("path", externalTbLocation)
+ .option("type", "table")
+ .build()
+ new SchemaManager(schemaFileIO, schemaTablePath).createTable(schema,
true)
+
+ sql(s"""
+ |CREATE TABLE external_tbl (age INT, name STRING) USING paimon
+ |PARTITIONED BY (id INT)
+ |LOCATION '$externalTbLocation'
+ |""".stripMargin)
+
+ sql("INSERT INTO external_tbl PARTITION (id = 1) VALUES (5, 'Ben'),
(7, 'Larry')")
+ sql("INSERT OVERWRITE external_tbl PARTITION (id = 1) VALUES (5,
'Jerry'), (7, 'Tom')")
+ checkAnswer(
+ sql("SELECT id, age, name FROM external_tbl ORDER BY id, age"),
+ Seq(Row(1, 5, "Jerry"), Row(1, 7, "Tom"))
+ )
+
+ sql("INSERT INTO external_tbl PARTITION (id = 3) VALUES (5,
'Alice')")
+ // Use dynamic partition overwrite mode to only overwrite partitions
present in data
+ withSparkSQLConf("spark.sql.sources.partitionOverwriteMode" ->
"dynamic") {
+ sql("INSERT OVERWRITE external_tbl VALUES (5, 'Jerry', 1), (7,
'Tom', 2)")
+ }
+ checkAnswer(
+ sql("SELECT id, age, name FROM external_tbl ORDER BY id, age"),
+ Seq(Row(1, 5, "Jerry"), Row(2, 7, "Tom"), Row(3, 5, "Alice"))
+ )
+ }
+ }
+ }
+
+ test("PaimonExternalTable: show partitions on external table") {
+ withTempDir {
+ tbLocation =>
+ withTable("external_tbl") {
+ val externalTbLocation = tbLocation.getCanonicalPath
+
+ // For REST catalog external tables, schema must be created in
filesystem first
+ val schemaTablePath = new Path(externalTbLocation)
+ val schemaFileIO = LocalFileIO.create()
+ val schema = Schema
+ .newBuilder()
+ .column("id", DataTypes.INT())
+ .column("p1", DataTypes.INT())
+ .column("p2", DataTypes.STRING())
+ .partitionKeys("p1", "p2")
+ .option("path", externalTbLocation)
+ .option("type", "table")
+ .build()
+ new SchemaManager(schemaFileIO, schemaTablePath).createTable(schema,
true)
+
+ sql(s"""
+ |CREATE TABLE external_tbl (id INT, p1 INT, p2 STRING) USING
paimon
+ |PARTITIONED BY (p1, p2)
+ |LOCATION '$externalTbLocation'
+ |""".stripMargin)
+
+ sql("INSERT INTO external_tbl VALUES (1, 1, '1')")
+ sql("INSERT INTO external_tbl VALUES (2, 1, '1')")
+ sql("INSERT INTO external_tbl VALUES (3, 2, '1')")
+ sql("INSERT INTO external_tbl VALUES (3, 2, '2')")
+
+ checkAnswer(
+ sql("SHOW PARTITIONS external_tbl"),
+ Seq(Row("p1=1/p2=1"), Row("p1=2/p2=1"), Row("p1=2/p2=2")))
+ checkAnswer(
+ sql("SHOW PARTITIONS external_tbl PARTITION (p1=2)"),
+ Seq(Row("p1=2/p2=1"), Row("p1=2/p2=2")))
+ checkAnswer(
+ sql("SHOW PARTITIONS external_tbl PARTITION (p1=2, p2='2')"),
+ Seq(Row("p1=2/p2=2")))
+ }
+ }
+ }
+}