This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 2db0281696 [core] Skip case checking in catalog (#4730)
2db0281696 is described below

commit 2db0281696570a6909637854a682e56f19f2c512
Author: Zouxxyy <[email protected]>
AuthorDate: Wed Dec 18 19:06:18 2024 +0800

    [core] Skip case checking in catalog (#4730)
---
 .../org/apache/paimon/catalog/AbstractCatalog.java | 30 ------------
 .../org/apache/paimon/catalog/CatalogUtils.java    | 25 ----------
 .../paimon/catalog/FileSystemCatalogTest.java      | 11 +----
 .../org/apache/paimon/jdbc/JdbcCatalogTest.java    | 16 ++-----
 .../flink/action/cdc/SyncDatabaseActionBase.java   |  8 ----
 .../flink/action/cdc/SyncTableActionBase.java      |  7 ---
 .../action/cdc/SynchronizationActionBase.java      |  4 --
 .../org/apache/paimon/hive/HiveCatalogTest.java    | 17 +++----
 .../apache/paimon/hive/HiveCatalogITCaseBase.java  | 48 ++++---------------
 .../java/org/apache/paimon/spark/SparkCatalog.java | 16 ++-----
 .../apache/paimon/spark/SparkGenericCatalog.java   | 13 +----
 .../spark/SparkGenericCatalogWithHiveTest.java     | 56 +---------------------
 .../spark/sql/DDLWithHiveCatalogTestBase.scala     | 47 ++++++++++++++++++
 13 files changed, 73 insertions(+), 225 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
index 51cb346d4b..d7447c37dd 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/AbstractCatalog.java
@@ -48,7 +48,6 @@ import javax.annotation.Nullable;
 
 import java.io.IOException;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -282,8 +281,6 @@ public abstract class AbstractCatalog implements Catalog {
             throws TableAlreadyExistException, DatabaseNotExistException {
         checkNotBranch(identifier, "createTable");
         checkNotSystemTable(identifier, "createTable");
-        validateIdentifierNameCaseInsensitive(identifier);
-        validateFieldNameCaseInsensitive(schema.rowType().getFieldNames());
         validateAutoCreateClose(schema.options());
         validateCustomTablePath(schema.options());
 
@@ -339,7 +336,6 @@ public abstract class AbstractCatalog implements Catalog {
         checkNotBranch(toTable, "renameTable");
         checkNotSystemTable(fromTable, "renameTable");
         checkNotSystemTable(toTable, "renameTable");
-        validateIdentifierNameCaseInsensitive(toTable);
 
         try {
             getTable(fromTable);
@@ -366,8 +362,6 @@ public abstract class AbstractCatalog implements Catalog {
             Identifier identifier, List<SchemaChange> changes, boolean 
ignoreIfNotExists)
             throws TableNotExistException, ColumnAlreadyExistException, 
ColumnNotExistException {
         checkNotSystemTable(identifier, "alterTable");
-        validateIdentifierNameCaseInsensitive(identifier);
-        validateFieldNameCaseInsensitiveInSchemaChange(changes);
 
         try {
             getTable(identifier);
@@ -571,30 +565,6 @@ public abstract class AbstractCatalog implements Catalog {
         }
     }
 
-    protected void validateIdentifierNameCaseInsensitive(Identifier 
identifier) {
-        CatalogUtils.validateCaseInsensitive(
-                caseSensitive(), "Database", identifier.getDatabaseName());
-        CatalogUtils.validateCaseInsensitive(caseSensitive(), "Table", 
identifier.getObjectName());
-    }
-
-    private void 
validateFieldNameCaseInsensitiveInSchemaChange(List<SchemaChange> changes) {
-        List<String> fieldNames = new ArrayList<>();
-        for (SchemaChange change : changes) {
-            if (change instanceof SchemaChange.AddColumn) {
-                SchemaChange.AddColumn addColumn = (SchemaChange.AddColumn) 
change;
-                fieldNames.addAll(Arrays.asList(addColumn.fieldNames()));
-            } else if (change instanceof SchemaChange.RenameColumn) {
-                SchemaChange.RenameColumn rename = (SchemaChange.RenameColumn) 
change;
-                fieldNames.add(rename.newName());
-            }
-        }
-        validateFieldNameCaseInsensitive(fieldNames);
-    }
-
-    protected void validateFieldNameCaseInsensitive(List<String> fieldNames) {
-        CatalogUtils.validateCaseInsensitive(caseSensitive(), "Field", 
fieldNames);
-    }
-
     private void validateAutoCreateClose(Map<String, String> options) {
         checkArgument(
                 !Boolean.parseBoolean(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
index bae23c6276..043da0504d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
+++ b/paimon-core/src/main/java/org/apache/paimon/catalog/CatalogUtils.java
@@ -21,14 +21,10 @@ package org.apache.paimon.catalog;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.schema.SchemaManager;
 
-import java.util.Arrays;
-import java.util.List;
 import java.util.Map;
-import java.util.stream.Collectors;
 
 import static org.apache.paimon.catalog.Catalog.TABLE_DEFAULT_OPTION_PREFIX;
 import static 
org.apache.paimon.options.OptionsUtils.convertToPropertiesPrefixKey;
-import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Utils for {@link Catalog}. */
 public class CatalogUtils {
@@ -64,25 +60,4 @@ public class CatalogUtils {
     public static Map<String, String> tableDefaultOptions(Map<String, String> 
options) {
         return convertToPropertiesPrefixKey(options, 
TABLE_DEFAULT_OPTION_PREFIX);
     }
-
-    /** Validate database, table and field names must be lowercase when not 
case-sensitive. */
-    public static void validateCaseInsensitive(
-            boolean caseSensitive, String type, String... names) {
-        validateCaseInsensitive(caseSensitive, type, Arrays.asList(names));
-    }
-
-    /** Validate database, table and field names must be lowercase when not 
case-sensitive. */
-    public static void validateCaseInsensitive(
-            boolean caseSensitive, String type, List<String> names) {
-        if (caseSensitive) {
-            return;
-        }
-        List<String> illegalNames =
-                names.stream().filter(f -> 
!f.equals(f.toLowerCase())).collect(Collectors.toList());
-        checkArgument(
-                illegalNames.isEmpty(),
-                String.format(
-                        "%s name %s cannot contain upper case in the catalog.",
-                        type, illegalNames));
-    }
 }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
index 7045daca8e..dcd27a91ed 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/catalog/FileSystemCatalogTest.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.catalog;
 
 import org.apache.paimon.fs.Path;
-import org.apache.paimon.options.CatalogOptions;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.types.DataTypes;
@@ -29,7 +28,6 @@ import 
org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
-import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for {@link FileSystemCatalog}. */
@@ -39,14 +37,13 @@ public class FileSystemCatalogTest extends CatalogTestBase {
     public void setUp() throws Exception {
         super.setUp();
         Options catalogOptions = new Options();
-        catalogOptions.set(CatalogOptions.CASE_SENSITIVE, false);
         catalog = new FileSystemCatalog(fileIO, new Path(warehouse), 
catalogOptions);
     }
 
     @Test
     public void testCreateTableCaseSensitive() throws Exception {
         catalog.createDatabase("test_db", false);
-        Identifier identifier = Identifier.create("test_db", "new_table");
+        Identifier identifier = Identifier.create("test_db", "new_TABLE");
         Schema schema =
                 Schema.newBuilder()
                         .column("Pk1", DataTypes.INT())
@@ -64,11 +61,7 @@ public class FileSystemCatalogTest extends CatalogTestBase {
                         .partitionKeys("Pk1", "pk2")
                         .primaryKey("Pk1", "pk2", "pk3")
                         .build();
-
-        // Create table throws Exception if using uppercase when 
'allow-upper-case' is false
-        assertThatExceptionOfType(IllegalArgumentException.class)
-                .isThrownBy(() -> catalog.createTable(identifier, schema, 
false))
-                .withMessage("Field name [Pk1, Col1] cannot contain upper case 
in the catalog.");
+        catalog.createTable(identifier, schema, false);
     }
 
     @Test
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java 
b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
index f01a46fd6b..51e2bf5c77 100644
--- a/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
+++ b/paimon-core/src/test/java/org/apache/paimon/jdbc/JdbcCatalogTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.jdbc;
 
+import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.CatalogTestBase;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.options.CatalogOptions;
@@ -87,7 +88,7 @@ public class JdbcCatalogTest extends CatalogTestBase {
     }
 
     @Test
-    public void testCheckIdentifierUpperCase() throws Exception {
+    public void testUpperCase() throws Exception {
         catalog.createDatabase("test_db", false);
         assertThatThrownBy(
                         () ->
@@ -95,17 +96,10 @@ public class JdbcCatalogTest extends CatalogTestBase {
                                         Identifier.create("TEST_DB", 
"new_table"),
                                         DEFAULT_TABLE_SCHEMA,
                                         false))
-                .isInstanceOf(IllegalArgumentException.class)
-                .hasMessage("Database name [TEST_DB] cannot contain upper case 
in the catalog.");
+                .isInstanceOf(Catalog.DatabaseNotExistException.class)
+                .hasMessage("Database TEST_DB does not exist.");
 
-        assertThatThrownBy(
-                        () ->
-                                catalog.createTable(
-                                        Identifier.create("test_db", 
"NEW_TABLE"),
-                                        DEFAULT_TABLE_SCHEMA,
-                                        false))
-                .isInstanceOf(IllegalArgumentException.class)
-                .hasMessage("Table name [NEW_TABLE] cannot contain upper case 
in the catalog.");
+        catalog.createTable(Identifier.create("test_db", "new_TABLE"), 
DEFAULT_TABLE_SCHEMA, false);
     }
 
     @Test
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
index fd9892c0f1..d6d85e59bb 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncDatabaseActionBase.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.flink.action.cdc;
 
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogUtils;
 import org.apache.paimon.flink.action.Action;
 import org.apache.paimon.flink.action.MultiTablesSinkMode;
 import org.apache.paimon.flink.sink.cdc.EventParser;
@@ -166,13 +165,6 @@ public abstract class SyncDatabaseActionBase extends 
SynchronizationActionBase {
         return this;
     }
 
-    @Override
-    protected void validateCaseSensitivity() {
-        CatalogUtils.validateCaseInsensitive(caseSensitive, "Database", 
database);
-        CatalogUtils.validateCaseInsensitive(caseSensitive, "Table prefix", 
tablePrefix);
-        CatalogUtils.validateCaseInsensitive(caseSensitive, "Table suffix", 
tableSuffix);
-    }
-
     @Override
     protected FlatMapFunction<CdcSourceRecord, RichCdcMultiplexRecord> 
recordParse() {
         return syncJobHandler.provideRecordParser(
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
index d997fcc290..ae4f5346b2 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SyncTableActionBase.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.action.cdc;
 
 import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.catalog.Catalog;
-import org.apache.paimon.catalog.CatalogUtils;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.action.Action;
@@ -111,12 +110,6 @@ public abstract class SyncTableActionBase extends 
SynchronizationActionBase {
                 true);
     }
 
-    @Override
-    protected void validateCaseSensitivity() {
-        CatalogUtils.validateCaseInsensitive(caseSensitive, "Database", 
database);
-        CatalogUtils.validateCaseInsensitive(caseSensitive, "Table", table);
-    }
-
     @Override
     protected void beforeBuildingSourceSink() throws Exception {
         Identifier identifier = new Identifier(database, table);
diff --git 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
index ebc051171d..ce2766b941 100644
--- 
a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
+++ 
b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/SynchronizationActionBase.java
@@ -113,8 +113,6 @@ public abstract class SynchronizationActionBase extends 
ActionBase {
 
         catalog.createDatabase(database, true);
 
-        validateCaseSensitivity();
-
         beforeBuildingSourceSink();
 
         DataStream<RichCdcMultiplexRecord> input =
@@ -125,8 +123,6 @@ public abstract class SynchronizationActionBase extends 
ActionBase {
         buildSink(input, parserFactory);
     }
 
-    protected abstract void validateCaseSensitivity();
-
     protected void beforeBuildingSourceSink() throws Exception {}
 
     protected Source<CdcSourceRecord, ?, ?> buildSource() {
diff --git 
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
 
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
index e3b48f02a6..bf6eb02f3e 100644
--- 
a/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
+++ 
b/paimon-hive/paimon-hive-catalog/src/test/java/org/apache/paimon/hive/HiveCatalogTest.java
@@ -85,23 +85,18 @@ public class HiveCatalogTest extends CatalogTestBase {
     @Test
     public void testCheckIdentifierUpperCase() throws Exception {
         catalog.createDatabase("test_db", false);
-        assertThatThrownBy(
-                        () ->
-                                catalog.createTable(
-                                        Identifier.create("TEST_DB", 
"new_table"),
-                                        DEFAULT_TABLE_SCHEMA,
-                                        false))
-                .isInstanceOf(IllegalArgumentException.class)
-                .hasMessage("Database name [TEST_DB] cannot contain upper case 
in the catalog.");
-
+        assertThatThrownBy(() -> catalog.createDatabase("TEST_DB", false))
+                .isInstanceOf(Catalog.DatabaseAlreadyExistException.class)
+                .hasMessage("Database TEST_DB already exists.");
+        catalog.createTable(Identifier.create("TEST_DB", "new_table"), 
DEFAULT_TABLE_SCHEMA, false);
         assertThatThrownBy(
                         () ->
                                 catalog.createTable(
                                         Identifier.create("test_db", 
"NEW_TABLE"),
                                         DEFAULT_TABLE_SCHEMA,
                                         false))
-                .isInstanceOf(IllegalArgumentException.class)
-                .hasMessage("Table name [NEW_TABLE] cannot contain upper case 
in the catalog.");
+                .isInstanceOf(Catalog.TableAlreadyExistException.class)
+                .hasMessage("Table test_db.NEW_TABLE already exists.");
     }
 
     private static final String HADOOP_CONF_DIR =
diff --git 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
index 2266a8484d..c39b85cb3d 100644
--- 
a/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
+++ 
b/paimon-hive/paimon-hive-connector-common/src/test/java/org/apache/paimon/hive/HiveCatalogITCaseBase.java
@@ -577,8 +577,7 @@ public abstract class HiveCatalogITCaseBase {
                                 "  'uri' = '',",
                                 "  'warehouse' = '" + path + "',",
                                 "  'lock.enabled' = 'true',",
-                                "  'table.type' = 'EXTERNAL',",
-                                "  'allow-upper-case' = 'true'",
+                                "  'table.type' = 'EXTERNAL'",
                                 ")"))
                 .await();
         tEnv.executeSql("USE CATALOG paimon_catalog_01").await();
@@ -593,30 +592,6 @@ public abstract class HiveCatalogITCaseBase {
         tEnv.executeSql("DROP TABLE t").await();
         Path tablePath = new Path(path, "test_db.db/t");
         assertThat(tablePath.getFileSystem().exists(tablePath)).isTrue();
-
-        tEnv.executeSql(
-                        String.join(
-                                "\n",
-                                "CREATE CATALOG paimon_catalog_02 WITH (",
-                                "  'type' = 'paimon',",
-                                "  'metastore' = 'hive',",
-                                "  'uri' = '',",
-                                "  'warehouse' = '" + path + "',",
-                                "  'lock.enabled' = 'true',",
-                                "  'table.type' = 'EXTERNAL',",
-                                "  'allow-upper-case' = 'false'",
-                                ")"))
-                .await();
-        tEnv.executeSql("USE CATALOG paimon_catalog_02").await();
-        tEnv.executeSql("USE test_db").await();
-
-        // set case-sensitive = false would throw exception out
-        assertThatThrownBy(
-                        () ->
-                                tEnv.executeSql(
-                                                "CREATE TABLE t1 ( aa INT, Bb 
STRING ) WITH ( 'file.format' = 'avro' )")
-                                        .await())
-                .isInstanceOf(RuntimeException.class);
     }
 
     @Test
@@ -1006,7 +981,8 @@ public abstract class HiveCatalogITCaseBase {
 
         // the target table name has upper case.
         assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE t1 RENAME TO 
T1"))
-                .hasMessage("Table name [T1] cannot contain upper case in the 
catalog.");
+                .hasMessage(
+                        "Could not execute ALTER TABLE my_hive.test_db.t1 
RENAME TO my_hive.test_db.T1");
 
         tEnv.executeSql("ALTER TABLE t1 RENAME TO t3").await();
 
@@ -1160,24 +1136,16 @@ public abstract class HiveCatalogITCaseBase {
 
     @Test
     public void testUpperCase() {
+        tEnv.executeSql("CREATE TABLE T (a INT, b STRING ) WITH ( 
'file.format' = 'avro' )");
+        tEnv.executeSql(
+                "CREATE TABLE tT (A INT, b STRING, C STRING) WITH ( 
'file.format' = 'avro')");
         assertThatThrownBy(
                         () ->
                                 tEnv.executeSql(
-                                                "CREATE TABLE T ( a INT, b 
STRING ) WITH ( 'file.format' = 'avro' )")
-                                        .await())
-                .hasRootCauseMessage(
-                        String.format(
-                                "Table name [%s] cannot contain upper case in 
the catalog.", "T"));
-
-        assertThatThrownBy(
-                        () ->
-                                tEnv.executeSql(
-                                                "CREATE TABLE t (A INT, b 
STRING, C STRING) WITH ( 'file.format' = 'avro')")
+                                                "CREATE TABLE tt ( A INT, b 
STRING, C STRING) WITH ( 'file.format' = 'avro' )")
                                         .await())
                 .hasRootCauseMessage(
-                        String.format(
-                                "Field name %s cannot contain upper case in 
the catalog.",
-                                "[A, C]"));
+                        "Table (or view) test_db.tt already exists in Catalog 
my_hive.");
     }
 
     @Test
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
index 12023cb847..f32b87603f 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkCatalog.java
@@ -52,7 +52,6 @@ import 
org.apache.spark.sql.execution.datasources.v2.FileTable;
 import org.apache.spark.sql.execution.datasources.v2.csv.CSVTable;
 import org.apache.spark.sql.execution.datasources.v2.orc.OrcTable;
 import org.apache.spark.sql.execution.datasources.v2.parquet.ParquetTable;
-import org.apache.spark.sql.internal.SessionState;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 import org.apache.spark.sql.util.CaseInsensitiveStringMap;
@@ -70,7 +69,6 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.CoreOptions.FILE_FORMAT;
 import static org.apache.paimon.CoreOptions.TYPE;
 import static org.apache.paimon.TableType.FORMAT_TABLE;
-import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
 import static org.apache.paimon.spark.SparkCatalogOptions.DEFAULT_DATABASE;
 import static org.apache.paimon.spark.SparkTypeUtils.toPaimonType;
 import static org.apache.paimon.spark.util.OptionUtils.copyWithSQLConf;
@@ -91,18 +89,10 @@ public class SparkCatalog extends SparkBaseCatalog 
implements SupportFunction, S
     @Override
     public void initialize(String name, CaseInsensitiveStringMap options) {
         this.catalogName = name;
-        Map<String, String> newOptions = new 
HashMap<>(options.asCaseSensitiveMap());
-        SessionState sessionState = SparkSession.active().sessionState();
-
         CatalogContext catalogContext =
-                CatalogContext.create(Options.fromMap(options), 
sessionState.newHadoopConf());
-
-        // if spark is case-insensitive, set case-sensitive to catalog
-        if (!sessionState.conf().caseSensitiveAnalysis()) {
-            newOptions.put(CASE_SENSITIVE.key(), "true");
-        }
-        options = new CaseInsensitiveStringMap(newOptions);
-
+                CatalogContext.create(
+                        Options.fromMap(options),
+                        SparkSession.active().sessionState().newHadoopConf());
         this.catalog = CatalogFactory.createCatalog(catalogContext);
         this.defaultDatabase =
                 options.getOrDefault(DEFAULT_DATABASE.key(), 
DEFAULT_DATABASE.defaultValue());
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
index b57228fa44..ac1543f2fe 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/SparkGenericCatalog.java
@@ -62,7 +62,6 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.concurrent.Callable;
 
-import static org.apache.paimon.options.CatalogOptions.CASE_SENSITIVE;
 import static org.apache.paimon.options.CatalogOptions.METASTORE;
 import static org.apache.paimon.options.CatalogOptions.WAREHOUSE;
 import static 
org.apache.paimon.spark.SparkCatalogOptions.CREATE_UNDERLYING_SESSION_CATALOG;
@@ -242,7 +241,6 @@ public class SparkGenericCatalog extends SparkBaseCatalog 
implements CatalogExte
         SparkSession sparkSession = SparkSession.active();
         SessionState sessionState = sparkSession.sessionState();
         Configuration hadoopConf = sessionState.newHadoopConf();
-        SparkConf sparkConf = new SparkConf();
         if (options.containsKey(METASTORE.key())
                 && options.get(METASTORE.key()).equalsIgnoreCase("hive")) {
             String uri = options.get(CatalogOptions.URI.key());
@@ -257,11 +255,6 @@ public class SparkGenericCatalog extends SparkBaseCatalog 
implements CatalogExte
                 }
             }
         }
-        if ("in-memory"
-                
.equals(sparkSession.conf().get(StaticSQLConf.CATALOG_IMPLEMENTATION().key()))) 
{
-            LOG.warn("InMemoryCatalog here may cause bad effect.");
-        }
-
         this.catalogName = name;
         this.sparkCatalog = new SparkCatalog();
 
@@ -273,6 +266,7 @@ public class SparkGenericCatalog extends SparkBaseCatalog 
implements CatalogExte
                 CREATE_UNDERLYING_SESSION_CATALOG.key(),
                 CREATE_UNDERLYING_SESSION_CATALOG.defaultValue())) {
             this.underlyingSessionCatalogEnabled = true;
+            SparkConf sparkConf = new SparkConf();
             for (Map.Entry<String, String> entry : options.entrySet()) {
                 sparkConf.set("spark.hadoop." + entry.getKey(), 
entry.getValue());
                 hadoopConf.set(entry.getKey(), entry.getValue());
@@ -330,11 +324,6 @@ public class SparkGenericCatalog extends SparkBaseCatalog 
implements CatalogExte
         } else {
             options.put(DEFAULT_DATABASE.key(), sessionCatalogDefaultDatabase);
         }
-
-        // if spark is case-insensitive, set case-sensitive to catalog
-        if (!sqlConf.caseSensitiveAnalysis()) {
-            options.put(CASE_SENSITIVE.key(), "true");
-        }
     }
 
     @Override
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
index 84ea1ab5cb..f3d4ba8789 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
+++ 
b/paimon-spark/paimon-spark-ut/src/test/java/org/apache/paimon/spark/SparkGenericCatalogWithHiveTest.java
@@ -31,7 +31,6 @@ import java.io.FileNotFoundException;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.junit.jupiter.api.Assertions.assertThrows;
 
 /** Base tests for spark read. */
 public class SparkGenericCatalogWithHiveTest {
@@ -48,59 +47,6 @@ public class SparkGenericCatalogWithHiveTest {
         testHiveMetastore.stop();
     }
 
-    @Test
-    public void testCreateTableCaseSensitive(@TempDir java.nio.file.Path 
tempDir) {
-        // firstly, we use hive metastore to creata table, and check the 
result.
-        Path warehousePath = new Path("file:" + tempDir.toString());
-        SparkSession spark =
-                SparkSession.builder()
-                        .config("spark.sql.warehouse.dir", 
warehousePath.toString())
-                        // with case-sensitive false
-                        .config("spark.sql.caseSensitive", "false")
-                        // with hive metastore
-                        .config("spark.sql.catalogImplementation", "hive")
-                        .config(
-                                "spark.sql.catalog.spark_catalog",
-                                SparkGenericCatalog.class.getName())
-                        .master("local[2]")
-                        .getOrCreate();
-
-        spark.sql("CREATE DATABASE IF NOT EXISTS my_db1");
-        spark.sql("USE my_db1");
-        spark.sql(
-                "CREATE TABLE IF NOT EXISTS t2 (a INT, Bb INT, c STRING) USING 
paimon TBLPROPERTIES"
-                        + " ('file.format'='avro')");
-
-        assertThat(
-                        spark.sql("SHOW TABLES").collectAsList().stream()
-                                .map(s -> s.get(1))
-                                .map(Object::toString))
-                .containsExactlyInAnyOrder("t2");
-        spark.close();
-
-        SparkSession spark1 =
-                SparkSession.builder()
-                        .config("spark.sql.warehouse.dir", 
warehousePath.toString())
-                        // with case-sensitive true
-                        .config("spark.sql.caseSensitive", "true")
-                        // with hive metastore
-                        .config("spark.sql.catalogImplementation", "hive")
-                        .config(
-                                "spark.sql.catalog.spark_catalog",
-                                SparkGenericCatalog.class.getName())
-                        .master("local[2]")
-                        .getOrCreate();
-
-        spark1.sql("USE my_db1");
-        assertThrows(
-                RuntimeException.class,
-                () ->
-                        spark1.sql(
-                                "CREATE TABLE IF NOT EXISTS t3 (a INT, Bb INT, 
c STRING) USING paimon TBLPROPERTIES"
-                                        + " ('file.format'='avro')"));
-        spark1.close();
-    }
-
     @Test
     public void testBuildWithHive(@TempDir java.nio.file.Path tempDir) {
         // firstly, we use hive metastore to create table, and check the 
result.
@@ -123,7 +69,7 @@ public class SparkGenericCatalogWithHiveTest {
                         + " ('file.format'='avro')");
 
         assertThat(spark.sql("SHOW 
NAMESPACES").collectAsList().stream().map(Object::toString))
-                .containsExactlyInAnyOrder("[default]", "[my_db]", "[my_db1]");
+                .containsExactlyInAnyOrder("[default]", "[my_db]");
 
         assertThat(
                         spark.sql("SHOW TABLES").collectAsList().stream()
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
index d51cdce34c..5311c586a5 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DDLWithHiveCatalogTestBase.scala
@@ -553,6 +553,53 @@ abstract class DDLWithHiveCatalogTestBase extends 
PaimonHiveTestBase {
     }
   }
 
+  test("Paimon DDL with hive catalog: case sensitive") {
+    Seq(sparkCatalogName, paimonHiveCatalogName).foreach {
+      catalogName =>
+        Seq(false, true).foreach {
+          caseSensitive =>
+            withSparkSQLConf("spark.sql.caseSensitive" -> 
caseSensitive.toString) {
+              spark.sql(s"USE $catalogName")
+              withDatabase("paimon_case_sensitive_DB") {
+                spark.sql(s"CREATE DATABASE paimon_case_sensitive_DB")
+
+                // check create db
+                // note: db name is always lower case in hive
+                intercept[Exception](spark.sql("CREATE DATABASE 
paimon_case_sensitive_db"))
+
+                spark.sql(s"USE paimon_case_sensitive_DB")
+                withTable("tT", "tt") {
+                  spark.sql("CREATE TABLE tT (aA INT) USING paimon")
+                  spark.sql("INSERT INTO tT VALUES 1")
+
+                  // check select
+                  checkAnswer(spark.sql("SELECT aA FROM tT"), Row(1))
+                  if (caseSensitive) {
+                    intercept[Exception](spark.sql(s"SELECT aa FROM tT"))
+                  } else {
+                    checkAnswer(spark.sql("SELECT aa FROM tT"), Row(1))
+                  }
+
+                  // check alter table rename
+                  // note: table name is always lower case in hive
+                  intercept[Exception](spark.sql(s"ALTER TABLE tT RENAME TO 
tt"))
+
+                  // check alter table rename column
+                  // note: col name can be upper case in hive
+                  if (caseSensitive) {
+                    spark.sql("ALTER TABLE tT RENAME COLUMN aA TO aa")
+                    checkAnswer(spark.sql("SELECT aa FROM tT"), Row(1))
+                    intercept[Exception](spark.sql(s"SELECT aA FROM tT"))
+                  } else {
+                    intercept[Exception](spark.sql("ALTER TABLE tT RENAME 
COLUMN aA TO aa"))
+                  }
+                }
+              }
+            }
+        }
+    }
+  }
+
   def getDatabaseProp(dbName: String, propertyName: String): String = {
     spark
       .sql(s"DESC DATABASE EXTENDED $dbName")

Reply via email to