This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new b20907ed8b [spark] Support spark to create external table sql
statements. (#4576)
b20907ed8b is described below
commit b20907ed8b9a9542901bb34a880b3ff864931e4e
Author: Kerwin <[email protected]>
AuthorDate: Mon Nov 25 10:30:22 2024 +0800
[spark] Support spark to create external table sql statements. (#4576)
---
.../java/org/apache/paimon/hive/HiveCatalog.java | 20 +++++++---
.../paimon/spark/SparkCatalogWithHiveTest.java | 45 ++++++++++++++++++++++
2 files changed, 60 insertions(+), 5 deletions(-)
diff --git
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
index b92c3b59d9..ebd5a1edf8 100644
---
a/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
+++
b/paimon-hive/paimon-hive-catalog/src/main/java/org/apache/paimon/hive/HiveCatalog.java
@@ -135,6 +135,7 @@ public class HiveCatalog extends AbstractCatalog {
"org.apache.paimon.hive.PaimonStorageHandler";
private static final String HIVE_PREFIX = "hive.";
public static final String HIVE_SITE_FILE = "hive-site.xml";
+ private static final String HIVE_EXTERNAL_TABLE_PROP = "EXTERNAL";
private final HiveConf hiveConf;
private final String clientClassName;
@@ -218,7 +219,7 @@ public class HiveCatalog extends AbstractCatalog {
externalTable = true;
location = new Path(tableOptions.get(CoreOptions.PATH.key()));
} else {
- externalTable = usingExternalTable();
+ externalTable = usingExternalTable(tableOptions);
location = getTableLocation(identifier, null);
}
return Pair.of(location, externalTable);
@@ -659,12 +660,18 @@ public class HiveCatalog extends AbstractCatalog {
}
}
- private boolean usingExternalTable() {
+ private boolean usingExternalTable(Map<String, String> tableOptions) {
CatalogTableType tableType =
OptionsUtils.convertToEnum(
hiveConf.get(TABLE_TYPE.key(),
CatalogTableType.MANAGED.toString()),
CatalogTableType.class);
- return CatalogTableType.EXTERNAL.equals(tableType);
+
+ String externalPropValue =
+ tableOptions.getOrDefault(
+ HIVE_EXTERNAL_TABLE_PROP.toLowerCase(),
+
tableOptions.get(HIVE_EXTERNAL_TABLE_PROP.toUpperCase()));
+ return CatalogTableType.EXTERNAL.equals(tableType)
+ || "TRUE".equalsIgnoreCase(externalPropValue);
}
@Override
@@ -962,7 +969,10 @@ public class HiveCatalog extends AbstractCatalog {
if (newTable == null) {
newTable =
createHiveTable(
- identifier, tableSchema, location,
usingExternalTable());
+ identifier,
+ tableSchema,
+ location,
+ usingExternalTable(tableSchema.options()));
}
Table finalNewTable = newTable;
clients.execute(client -> client.createTable(finalNewTable));
@@ -1081,7 +1091,7 @@ public class HiveCatalog extends AbstractCatalog {
table.getParameters().put(TYPE.key(), FORMAT_TABLE.toString());
}
if (externalTable) {
- table.getParameters().put("EXTERNAL", "TRUE");
+ table.getParameters().put(HIVE_EXTERNAL_TABLE_PROP, "TRUE");
}
return table;
}
diff --git
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
index 68cf91b8ec..45ccd06479 100644
---
a/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
+++
b/paimon-spark/paimon-spark-common/src/test/java/org/apache/paimon/spark/SparkCatalogWithHiveTest.java
@@ -19,7 +19,9 @@
package org.apache.paimon.spark;
import org.apache.paimon.fs.Path;
+import org.apache.paimon.fs.local.LocalFileIO;
import org.apache.paimon.hive.TestHiveMetastore;
+import org.apache.paimon.table.FileStoreTableFactory;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
@@ -32,6 +34,7 @@ import java.io.FileNotFoundException;
import java.util.stream.Collectors;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatCode;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
/** Base tests for spark read. */
@@ -136,4 +139,46 @@ public class SparkCatalogWithHiveTest {
spark.close();
}
+
+ @Test
+ public void testCreateExternalTable(@TempDir java.nio.file.Path tempDir) {
+ Path warehousePath = new Path("file:" + tempDir.toString());
+ SparkSession spark =
+ SparkSession.builder()
+ .config("spark.sql.warehouse.dir",
warehousePath.toString())
+ // with hive metastore
+ .config("spark.sql.catalogImplementation", "hive")
+ .config("hive.metastore.uris", "thrift://localhost:" +
PORT)
+ .config("spark.sql.catalog.spark_catalog",
SparkCatalog.class.getName())
+ .config("spark.sql.catalog.spark_catalog.metastore",
"hive")
+ .config(
+
"spark.sql.catalog.spark_catalog.hive.metastore.uris",
+ "thrift://localhost:" + PORT)
+ .config(
+ "spark.sql.catalog.spark_catalog.warehouse",
+ warehousePath.toString())
+ .master("local[2]")
+ .getOrCreate();
+
+ spark.sql("CREATE DATABASE IF NOT EXISTS test_db");
+ spark.sql("USE spark_catalog.test_db");
+
+ // create hive external table
+ spark.sql("CREATE EXTERNAL TABLE t1 (a INT, bb INT, c STRING)");
+
+ // drop hive external table
+ spark.sql("DROP TABLE t1");
+
+ // file system table exists
+ assertThatCode(
+ () ->
+ FileStoreTableFactory.create(
+ LocalFileIO.create(),
+ new Path(
+ warehousePath,
+ String.format("%s.db/%s",
"test_db", "t1"))))
+ .doesNotThrowAnyException();
+
+ spark.close();
+ }
}