This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new f843e0c  [fix](connector) Fix catalog config check  (#242)
f843e0c is described below

commit f843e0c13eeadf736e6120645947b19ddc9c09b8
Author: gnehil <[email protected]>
AuthorDate: Fri Dec 27 16:34:58 2024 +0800

    [fix](connector) Fix catalog config check  (#242)
    
    * fix unstable ut
    
    * fix unstable ut
    
    * fix doris catalog init config table check failed
---
 .../spark/client/read/DorisFlightSqlReader.java    |  5 +---
 .../org/apache/doris/spark/config/DorisConfig.java | 33 +++++++++++++---------
 .../apache/doris/spark/config/DorisOptions.java    |  2 +-
 .../apache/doris/spark/rdd/AbstractDorisRDD.scala  |  2 +-
 .../doris/spark/sql/sources/DorisRelation.scala    |  2 +-
 .../doris/spark/sql/DorisSourceProvider.scala      |  4 +--
 .../apache/doris/spark/catalog/DorisTable.scala    |  6 ++--
 .../doris/spark/catalog/DorisTableCatalog.scala    |  4 +--
 .../spark/sql/sources/DorisDataSourceV2.scala      |  4 +--
 9 files changed, 33 insertions(+), 29 deletions(-)

diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
index 88a6696..4561f40 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
@@ -159,9 +159,7 @@ public class DorisFlightSqlReader extends DorisReader {
                 .collect(Collectors.toMap(Field::getName, 
Function.identity()));
         String[] readColumns = partition.getReadColumns();
         List<Field> newFieldList = new ArrayList<>();
-        int offset = 0;
-        for (int i = 0; i < readColumns.length; i++) {
-            String readColumn = readColumns[i];
+        for (String readColumn : readColumns) {
             if (!fieldTypeMap.containsKey(readColumn) && readColumn.contains(" 
AS ")) {
                 int asIdx = readColumn.indexOf(" AS ");
                 String realColumn = readColumn.substring(asIdx + 
4).trim().replaceAll("`", "");
@@ -169,7 +167,6 @@ public class DorisFlightSqlReader extends DorisReader {
                         && 
("BITMAP".equalsIgnoreCase(fieldTypeMap.get(realColumn).getType())
                         || 
"HLL".equalsIgnoreCase(fieldTypeMap.get(realColumn).getType()))) {
                     newFieldList.add(new Field(realColumn, 
TPrimitiveType.VARCHAR.name(), null, 0, 0, null));
-                    offset++;
                 }
             } else {
                 
newFieldList.add(fieldTypeMap.get(readColumn.trim().replaceAll("`", "")));
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java
index 1d04b70..d8166f8 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java
@@ -29,17 +29,20 @@ import java.util.Map;
 
 public class DorisConfig implements Serializable {
 
-    private Map<String, String> configOptions;
     private final String DORIS_REQUEST_AUTH_USER = "doris.request.auth.user";
     private final String DORIS_REQUEST_AUTH_PASSWORD = 
"doris.request.auth.password";
 
+    private Map<String, String> configOptions;
+    private boolean ignoreTableCheck;
+
     // only for test
     public DorisConfig() {
         configOptions = Collections.emptyMap();
     }
 
-    private DorisConfig(Map<String, String> options) throws 
OptionRequiredException {
+    private DorisConfig(Map<String, String> options, Boolean ignoreTableCheck) 
throws OptionRequiredException {
         this.configOptions = new HashMap<>(processOptions(options));
+        this.ignoreTableCheck = ignoreTableCheck;
         checkOptions(this.configOptions);
     }
 
@@ -87,14 +90,16 @@ public class DorisConfig implements Serializable {
                 throw new IllegalArgumentException("option [" + 
DorisOptions.DORIS_FENODES.getName() + "] is not in correct format, for 
example: host:port[,host2:port]");
             }
         }
-        if 
(!options.containsKey(DorisOptions.DORIS_TABLE_IDENTIFIER.getName())) {
-            throw new 
OptionRequiredException(DorisOptions.DORIS_TABLE_IDENTIFIER.getName());
-        } else {
-            String tableIdentifier = 
options.get(DorisOptions.DORIS_TABLE_IDENTIFIER.getName());
-            if (tableIdentifier.isEmpty()) {
-                throw new IllegalArgumentException("option [" + 
DorisOptions.DORIS_TABLE_IDENTIFIER.getName() + "] is empty");
-            } else if (!tableIdentifier.contains(".")) {
-                throw new IllegalArgumentException("option [" + 
DorisOptions.DORIS_TABLE_IDENTIFIER.getName() + "] is not in correct format, 
for example: db.table");
+        if (!ignoreTableCheck) {
+            if 
(!options.containsKey(DorisOptions.DORIS_TABLE_IDENTIFIER.getName())) {
+                throw new 
OptionRequiredException(DorisOptions.DORIS_TABLE_IDENTIFIER.getName());
+            } else {
+                String tableIdentifier = 
options.get(DorisOptions.DORIS_TABLE_IDENTIFIER.getName());
+                if (tableIdentifier.isEmpty()) {
+                    throw new IllegalArgumentException("option [" + 
DorisOptions.DORIS_TABLE_IDENTIFIER.getName() + "] is empty");
+                } else if (!tableIdentifier.contains(".")) {
+                    throw new IllegalArgumentException("option [" + 
DorisOptions.DORIS_TABLE_IDENTIFIER.getName() + "] is not in correct format, 
for example: db.table");
+                }
             }
         }
         if (!options.containsKey(DorisOptions.DORIS_USER.getName())) {
@@ -176,16 +181,16 @@ public class DorisConfig implements Serializable {
         return new HashMap<>(configOptions);
     }
 
-    public static DorisConfig fromMap(Map<String, String> sparkConfMap) throws 
OptionRequiredException {
-        return fromMap(sparkConfMap, Collections.emptyMap());
+    public static DorisConfig fromMap(Map<String, String> sparkConfMap, 
Boolean ignoreTableCheck) throws OptionRequiredException {
+        return fromMap(sparkConfMap, Collections.emptyMap(), ignoreTableCheck);
     }
 
-    public static DorisConfig fromMap(Map<String, String> sparkConfMap, 
Map<String, String> options) throws OptionRequiredException {
+    public static DorisConfig fromMap(Map<String, String> sparkConfMap, 
Map<String, String> options, Boolean ignoreTableCheck) throws 
OptionRequiredException {
         Map<String, String> map = new HashMap<>(sparkConfMap);
         if (MapUtils.isNotEmpty(options)) {
             map.putAll(options);
         }
-        return new DorisConfig(map);
+        return new DorisConfig(map, ignoreTableCheck);
     }
 
 }
\ No newline at end of file
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
index 6d4f9ff..455c14b 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
@@ -110,7 +110,7 @@ public class DorisOptions {
 
     public static final ConfigOption<String> DORIS_HTTPS_KEY_STORE_PASSWORD = 
ConfigOptions.name("doris.https.key-store-password").stringType().withoutDefaultValue().withDescription("");
 
-    public static final ConfigOption<String> LOAD_MODE = 
ConfigOptions.name("doris.sink.load.mode").stringType().defaultValue("stream_load").withDescription("");
+    public static final ConfigOption<String> LOAD_MODE = 
ConfigOptions.name("doris.sink.mode").stringType().defaultValue("stream_load").withDescription("");
 
     public static final ConfigOption<String> READ_MODE = 
ConfigOptions.name("doris.read.mode").stringType().defaultValue("thrift").withDescription("");
 
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala
index 54598eb..bb9008a 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala
@@ -49,7 +49,7 @@ protected[spark] abstract class AbstractDorisRDD[T: ClassTag](
   /**
    * doris configuration get from rdd parameters and spark conf.
    */
-  @transient private[spark] lazy val dorisCfg = 
DorisConfig.fromMap(sc.getConf.getAll.toMap.asJava, params.asJava)
+  @transient private[spark] lazy val dorisCfg = 
DorisConfig.fromMap(sc.getConf.getAll.toMap.asJava, params.asJava, false)
 
   @transient private[spark] lazy val dorisPartitions = 
ReaderPartitionGenerator.generatePartitions(dorisCfg)
 }
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/sources/DorisRelation.scala
 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/sources/DorisRelation.scala
index 070629f..55249e9 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/sources/DorisRelation.scala
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/sources/DorisRelation.scala
@@ -35,7 +35,7 @@ private[sql] class DorisRelation(
                                   val sqlContext: SQLContext, parameters: 
Map[String, String])
   extends BaseRelation with TableScan with PrunedScan with PrunedFilteredScan 
with InsertableRelation {
 
-  private lazy val cfg = 
DorisConfig.fromMap(sqlContext.sparkContext.getConf.getAll.toMap.asJava, 
parameters.asJava)
+  private lazy val cfg = 
DorisConfig.fromMap(sqlContext.sparkContext.getConf.getAll.toMap.asJava, 
parameters.asJava, false)
 
   private lazy val inValueLengthLimit = 
cfg.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT)
 
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
 
b/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index a989359..86d18dc 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -54,7 +54,7 @@ private[sql] class DorisSourceProvider extends 
DorisSourceRegisterTrait
                               mode: SaveMode, parameters: Map[String, String],
                               data: DataFrame): BaseRelation = {
 
-    val config = 
DorisConfig.fromMap(sqlContext.sparkContext.getConf.getAll.toMap.asJava, 
parameters.asJava)
+    val config = 
DorisConfig.fromMap(sqlContext.sparkContext.getConf.getAll.toMap.asJava, 
parameters.asJava, false)
 
     mode match {
       case SaveMode.Overwrite =>
@@ -85,7 +85,7 @@ private[sql] class DorisSourceProvider extends 
DorisSourceRegisterTrait
   }
 
   override def createSink(sqlContext: SQLContext, parameters: Map[String, 
String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = {
-    new DorisStreamLoadSink(sqlContext, 
DorisConfig.fromMap(Utils.params(parameters, logger).asJava))
+    new DorisStreamLoadSink(sqlContext, 
DorisConfig.fromMap(Utils.params(parameters, logger).asJava, false))
   }
 
   private def truncateTable(config: DorisConfig): Unit = {
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTable.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTable.scala
index 6a890a8..d9aca8a 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTable.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTable.scala
@@ -18,7 +18,7 @@
 package org.apache.doris.spark.catalog
 
 import org.apache.doris.spark.client.DorisFrontendClient
-import org.apache.doris.spark.config.DorisConfig
+import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
 import org.apache.doris.spark.read.DorisScanBuilder
 import org.apache.doris.spark.rest.models.Schema
 import org.apache.doris.spark.util.SchemaConvertors
@@ -53,10 +53,12 @@ class DorisTable(identifier: Identifier, config: 
DorisConfig, schema: Option[Str
   }
 
   override def newScanBuilder(caseInsensitiveStringMap: 
CaseInsensitiveStringMap): ScanBuilder = {
-    new DorisScanBuilder(config: DorisConfig, schema())
+    config.setProperty(DorisOptions.DORIS_TABLE_IDENTIFIER, name())
+    new DorisScanBuilder(config, schema())
   }
 
   override def newWriteBuilder(logicalWriteInfo: LogicalWriteInfo): 
WriteBuilder = {
+    config.setProperty(DorisOptions.DORIS_TABLE_IDENTIFIER, name())
     new DorisWriteBuilder(config, logicalWriteInfo.schema())
   }
 
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTableCatalog.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTableCatalog.scala
index b953c24..9ca7f02 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTableCatalog.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTableCatalog.scala
@@ -44,7 +44,7 @@ class DorisTableCatalog extends DorisTableCatalogBase with 
TableCatalog {
   override def initialize(name: String, caseInsensitiveStringMap: 
CaseInsensitiveStringMap): Unit = {
     assert(catalogName.isEmpty, "The Doris table catalog is already initialed")
     catalogName = Some(name)
-    dorisConfig = DorisConfig.fromMap(caseInsensitiveStringMap)
+    dorisConfig = DorisConfig.fromMap(caseInsensitiveStringMap, true)
     frontend = new DorisFrontendClient(dorisConfig)
   }
 
@@ -55,7 +55,7 @@ class DorisTableCatalog extends DorisTableCatalogBase with 
TableCatalog {
   override def loadTable(identifier: Identifier): Table = {
     checkIdentifier(identifier)
     new DorisTable(identifier, DorisConfig.fromMap((dorisConfig.toMap.asScala +
-      (DorisOptions.DORIS_TABLE_IDENTIFIER.getName -> 
getFullTableName(identifier))).asJava), None)
+      (DorisOptions.DORIS_TABLE_IDENTIFIER.getName -> 
getFullTableName(identifier))).asJava, false), None)
   }
 
   override def createTable(identifier: Identifier, structType: StructType, 
transforms: Array[Transform], map: util.Map[String, String]): Table = throw new 
UnsupportedOperationException()
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/sql/sources/DorisDataSourceV2.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/sql/sources/DorisDataSourceV2.scala
index f7fbc2a..4fad7ed 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/sql/sources/DorisDataSourceV2.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/sql/sources/DorisDataSourceV2.scala
@@ -38,7 +38,7 @@ class DorisDataSourceV2 extends DorisDataSource with 
TableProvider {
   override def getTable(schema: StructType, partitioning: Array[Transform], 
properties: util.Map[String, String]): Table = {
     if (t != null) t
     else {
-      val dorisConfig = DorisConfig.fromMap(properties)
+      val dorisConfig = DorisConfig.fromMap(properties, false)
       val tableIdentifier = 
dorisConfig.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER)
       val tableIdentifierArr = tableIdentifier.split("\\.")
       new DorisTable(Identifier.of(Array[String](tableIdentifierArr(0)), 
tableIdentifierArr(1)), dorisConfig, Some(schema))
@@ -48,7 +48,7 @@ class DorisDataSourceV2 extends DorisDataSource with 
TableProvider {
   private def getTable(options: CaseInsensitiveStringMap): Table = {
     if (t != null) t
     else {
-      val dorisConfig = DorisConfig.fromMap(options)
+      val dorisConfig = DorisConfig.fromMap(options, false)
       val tableIdentifier = 
dorisConfig.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER)
       val tableIdentifierArr = tableIdentifier.split("\\.")
       new DorisTable(Identifier.of(Array[String](tableIdentifierArr(0)), 
tableIdentifierArr(1)), dorisConfig, None)


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to