This is an automated email from the ASF dual-hosted git repository.
diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git
The following commit(s) were added to refs/heads/master by this push:
new f843e0c [fix](connector) Fix catalog config check (#242)
f843e0c is described below
commit f843e0c13eeadf736e6120645947b19ddc9c09b8
Author: gnehil <[email protected]>
AuthorDate: Fri Dec 27 16:34:58 2024 +0800
[fix](connector) Fix catalog config check (#242)
* fix unstable ut
* fix unstable ut
* fix doris catalog init config table check failed
---
.../spark/client/read/DorisFlightSqlReader.java | 5 +---
.../org/apache/doris/spark/config/DorisConfig.java | 33 +++++++++++++---------
.../apache/doris/spark/config/DorisOptions.java | 2 +-
.../apache/doris/spark/rdd/AbstractDorisRDD.scala | 2 +-
.../doris/spark/sql/sources/DorisRelation.scala | 2 +-
.../doris/spark/sql/DorisSourceProvider.scala | 4 +--
.../apache/doris/spark/catalog/DorisTable.scala | 6 ++--
.../doris/spark/catalog/DorisTableCatalog.scala | 4 +--
.../spark/sql/sources/DorisDataSourceV2.scala | 4 +--
9 files changed, 33 insertions(+), 29 deletions(-)
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
index 88a6696..4561f40 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/client/read/DorisFlightSqlReader.java
@@ -159,9 +159,7 @@ public class DorisFlightSqlReader extends DorisReader {
.collect(Collectors.toMap(Field::getName,
Function.identity()));
String[] readColumns = partition.getReadColumns();
List<Field> newFieldList = new ArrayList<>();
- int offset = 0;
- for (int i = 0; i < readColumns.length; i++) {
- String readColumn = readColumns[i];
+ for (String readColumn : readColumns) {
if (!fieldTypeMap.containsKey(readColumn) && readColumn.contains("
AS ")) {
int asIdx = readColumn.indexOf(" AS ");
String realColumn = readColumn.substring(asIdx +
4).trim().replaceAll("`", "");
@@ -169,7 +167,6 @@ public class DorisFlightSqlReader extends DorisReader {
&&
("BITMAP".equalsIgnoreCase(fieldTypeMap.get(realColumn).getType())
||
"HLL".equalsIgnoreCase(fieldTypeMap.get(realColumn).getType()))) {
newFieldList.add(new Field(realColumn,
TPrimitiveType.VARCHAR.name(), null, 0, 0, null));
- offset++;
}
} else {
newFieldList.add(fieldTypeMap.get(readColumn.trim().replaceAll("`", "")));
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java
index 1d04b70..d8166f8 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisConfig.java
@@ -29,17 +29,20 @@ import java.util.Map;
public class DorisConfig implements Serializable {
- private Map<String, String> configOptions;
private final String DORIS_REQUEST_AUTH_USER = "doris.request.auth.user";
private final String DORIS_REQUEST_AUTH_PASSWORD =
"doris.request.auth.password";
+ private Map<String, String> configOptions;
+ private boolean ignoreTableCheck;
+
// only for test
public DorisConfig() {
configOptions = Collections.emptyMap();
}
- private DorisConfig(Map<String, String> options) throws
OptionRequiredException {
+ private DorisConfig(Map<String, String> options, Boolean ignoreTableCheck)
throws OptionRequiredException {
this.configOptions = new HashMap<>(processOptions(options));
+ this.ignoreTableCheck = ignoreTableCheck;
checkOptions(this.configOptions);
}
@@ -87,14 +90,16 @@ public class DorisConfig implements Serializable {
throw new IllegalArgumentException("option [" +
DorisOptions.DORIS_FENODES.getName() + "] is not in correct format, for
example: host:port[,host2:port]");
}
}
- if
(!options.containsKey(DorisOptions.DORIS_TABLE_IDENTIFIER.getName())) {
- throw new
OptionRequiredException(DorisOptions.DORIS_TABLE_IDENTIFIER.getName());
- } else {
- String tableIdentifier =
options.get(DorisOptions.DORIS_TABLE_IDENTIFIER.getName());
- if (tableIdentifier.isEmpty()) {
- throw new IllegalArgumentException("option [" +
DorisOptions.DORIS_TABLE_IDENTIFIER.getName() + "] is empty");
- } else if (!tableIdentifier.contains(".")) {
- throw new IllegalArgumentException("option [" +
DorisOptions.DORIS_TABLE_IDENTIFIER.getName() + "] is not in correct format,
for example: db.table");
+ if (!ignoreTableCheck) {
+ if
(!options.containsKey(DorisOptions.DORIS_TABLE_IDENTIFIER.getName())) {
+ throw new
OptionRequiredException(DorisOptions.DORIS_TABLE_IDENTIFIER.getName());
+ } else {
+ String tableIdentifier =
options.get(DorisOptions.DORIS_TABLE_IDENTIFIER.getName());
+ if (tableIdentifier.isEmpty()) {
+ throw new IllegalArgumentException("option [" +
DorisOptions.DORIS_TABLE_IDENTIFIER.getName() + "] is empty");
+ } else if (!tableIdentifier.contains(".")) {
+ throw new IllegalArgumentException("option [" +
DorisOptions.DORIS_TABLE_IDENTIFIER.getName() + "] is not in correct format,
for example: db.table");
+ }
}
}
if (!options.containsKey(DorisOptions.DORIS_USER.getName())) {
@@ -176,16 +181,16 @@ public class DorisConfig implements Serializable {
return new HashMap<>(configOptions);
}
- public static DorisConfig fromMap(Map<String, String> sparkConfMap) throws
OptionRequiredException {
- return fromMap(sparkConfMap, Collections.emptyMap());
+ public static DorisConfig fromMap(Map<String, String> sparkConfMap,
Boolean ignoreTableCheck) throws OptionRequiredException {
+ return fromMap(sparkConfMap, Collections.emptyMap(), ignoreTableCheck);
}
- public static DorisConfig fromMap(Map<String, String> sparkConfMap,
Map<String, String> options) throws OptionRequiredException {
+ public static DorisConfig fromMap(Map<String, String> sparkConfMap,
Map<String, String> options, Boolean ignoreTableCheck) throws
OptionRequiredException {
Map<String, String> map = new HashMap<>(sparkConfMap);
if (MapUtils.isNotEmpty(options)) {
map.putAll(options);
}
- return new DorisConfig(map);
+ return new DorisConfig(map, ignoreTableCheck);
}
}
\ No newline at end of file
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
index 6d4f9ff..455c14b 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
@@ -110,7 +110,7 @@ public class DorisOptions {
public static final ConfigOption<String> DORIS_HTTPS_KEY_STORE_PASSWORD =
ConfigOptions.name("doris.https.key-store-password").stringType().withoutDefaultValue().withDescription("");
- public static final ConfigOption<String> LOAD_MODE =
ConfigOptions.name("doris.sink.load.mode").stringType().defaultValue("stream_load").withDescription("");
+ public static final ConfigOption<String> LOAD_MODE =
ConfigOptions.name("doris.sink.mode").stringType().defaultValue("stream_load").withDescription("");
public static final ConfigOption<String> READ_MODE =
ConfigOptions.name("doris.read.mode").stringType().defaultValue("thrift").withDescription("");
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala
index 54598eb..bb9008a 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/rdd/AbstractDorisRDD.scala
@@ -49,7 +49,7 @@ protected[spark] abstract class AbstractDorisRDD[T: ClassTag](
/**
* doris configuration get from rdd parameters and spark conf.
*/
- @transient private[spark] lazy val dorisCfg =
DorisConfig.fromMap(sc.getConf.getAll.toMap.asJava, params.asJava)
+ @transient private[spark] lazy val dorisCfg =
DorisConfig.fromMap(sc.getConf.getAll.toMap.asJava, params.asJava, false)
@transient private[spark] lazy val dorisPartitions =
ReaderPartitionGenerator.generatePartitions(dorisCfg)
}
diff --git
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/sources/DorisRelation.scala
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/sources/DorisRelation.scala
index 070629f..55249e9 100644
---
a/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/sources/DorisRelation.scala
+++
b/spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/sql/sources/DorisRelation.scala
@@ -35,7 +35,7 @@ private[sql] class DorisRelation(
val sqlContext: SQLContext, parameters:
Map[String, String])
extends BaseRelation with TableScan with PrunedScan with PrunedFilteredScan
with InsertableRelation {
- private lazy val cfg =
DorisConfig.fromMap(sqlContext.sparkContext.getConf.getAll.toMap.asJava,
parameters.asJava)
+ private lazy val cfg =
DorisConfig.fromMap(sqlContext.sparkContext.getConf.getAll.toMap.asJava,
parameters.asJava, false)
private lazy val inValueLengthLimit =
cfg.getValue(DorisOptions.DORIS_FILTER_QUERY_IN_MAX_COUNT)
diff --git
a/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
b/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
index a989359..86d18dc 100644
---
a/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-2/src/main/scala/org/apache/doris/spark/sql/DorisSourceProvider.scala
@@ -54,7 +54,7 @@ private[sql] class DorisSourceProvider extends
DorisSourceRegisterTrait
mode: SaveMode, parameters: Map[String, String],
data: DataFrame): BaseRelation = {
- val config =
DorisConfig.fromMap(sqlContext.sparkContext.getConf.getAll.toMap.asJava,
parameters.asJava)
+ val config =
DorisConfig.fromMap(sqlContext.sparkContext.getConf.getAll.toMap.asJava,
parameters.asJava, false)
mode match {
case SaveMode.Overwrite =>
@@ -85,7 +85,7 @@ private[sql] class DorisSourceProvider extends
DorisSourceRegisterTrait
}
override def createSink(sqlContext: SQLContext, parameters: Map[String,
String], partitionColumns: Seq[String], outputMode: OutputMode): Sink = {
- new DorisStreamLoadSink(sqlContext,
DorisConfig.fromMap(Utils.params(parameters, logger).asJava))
+ new DorisStreamLoadSink(sqlContext,
DorisConfig.fromMap(Utils.params(parameters, logger).asJava, false))
}
private def truncateTable(config: DorisConfig): Unit = {
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTable.scala
b/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTable.scala
index 6a890a8..d9aca8a 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTable.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTable.scala
@@ -18,7 +18,7 @@
package org.apache.doris.spark.catalog
import org.apache.doris.spark.client.DorisFrontendClient
-import org.apache.doris.spark.config.DorisConfig
+import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
import org.apache.doris.spark.read.DorisScanBuilder
import org.apache.doris.spark.rest.models.Schema
import org.apache.doris.spark.util.SchemaConvertors
@@ -53,10 +53,12 @@ class DorisTable(identifier: Identifier, config:
DorisConfig, schema: Option[Str
}
override def newScanBuilder(caseInsensitiveStringMap:
CaseInsensitiveStringMap): ScanBuilder = {
- new DorisScanBuilder(config: DorisConfig, schema())
+ config.setProperty(DorisOptions.DORIS_TABLE_IDENTIFIER, name())
+ new DorisScanBuilder(config, schema())
}
override def newWriteBuilder(logicalWriteInfo: LogicalWriteInfo):
WriteBuilder = {
+ config.setProperty(DorisOptions.DORIS_TABLE_IDENTIFIER, name())
new DorisWriteBuilder(config, logicalWriteInfo.schema())
}
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTableCatalog.scala
b/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTableCatalog.scala
index b953c24..9ca7f02 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTableCatalog.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/catalog/DorisTableCatalog.scala
@@ -44,7 +44,7 @@ class DorisTableCatalog extends DorisTableCatalogBase with
TableCatalog {
override def initialize(name: String, caseInsensitiveStringMap:
CaseInsensitiveStringMap): Unit = {
assert(catalogName.isEmpty, "The Doris table catalog is already initialed")
catalogName = Some(name)
- dorisConfig = DorisConfig.fromMap(caseInsensitiveStringMap)
+ dorisConfig = DorisConfig.fromMap(caseInsensitiveStringMap, true)
frontend = new DorisFrontendClient(dorisConfig)
}
@@ -55,7 +55,7 @@ class DorisTableCatalog extends DorisTableCatalogBase with
TableCatalog {
override def loadTable(identifier: Identifier): Table = {
checkIdentifier(identifier)
new DorisTable(identifier, DorisConfig.fromMap((dorisConfig.toMap.asScala +
- (DorisOptions.DORIS_TABLE_IDENTIFIER.getName ->
getFullTableName(identifier))).asJava), None)
+ (DorisOptions.DORIS_TABLE_IDENTIFIER.getName ->
getFullTableName(identifier))).asJava, false), None)
}
override def createTable(identifier: Identifier, structType: StructType,
transforms: Array[Transform], map: util.Map[String, String]): Table = throw new
UnsupportedOperationException()
diff --git
a/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/sql/sources/DorisDataSourceV2.scala
b/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/sql/sources/DorisDataSourceV2.scala
index f7fbc2a..4fad7ed 100644
---
a/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/sql/sources/DorisDataSourceV2.scala
+++
b/spark-doris-connector/spark-doris-connector-spark-3/src/main/scala/org/apache/doris/spark/sql/sources/DorisDataSourceV2.scala
@@ -38,7 +38,7 @@ class DorisDataSourceV2 extends DorisDataSource with
TableProvider {
override def getTable(schema: StructType, partitioning: Array[Transform],
properties: util.Map[String, String]): Table = {
if (t != null) t
else {
- val dorisConfig = DorisConfig.fromMap(properties)
+ val dorisConfig = DorisConfig.fromMap(properties, false)
val tableIdentifier =
dorisConfig.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER)
val tableIdentifierArr = tableIdentifier.split("\\.")
new DorisTable(Identifier.of(Array[String](tableIdentifierArr(0)),
tableIdentifierArr(1)), dorisConfig, Some(schema))
@@ -48,7 +48,7 @@ class DorisDataSourceV2 extends DorisDataSource with
TableProvider {
private def getTable(options: CaseInsensitiveStringMap): Table = {
if (t != null) t
else {
- val dorisConfig = DorisConfig.fromMap(options)
+ val dorisConfig = DorisConfig.fromMap(options, false)
val tableIdentifier =
dorisConfig.getValue(DorisOptions.DORIS_TABLE_IDENTIFIER)
val tableIdentifierArr = tableIdentifier.split("\\.")
new DorisTable(Identifier.of(Array[String](tableIdentifierArr(0)),
tableIdentifierArr(1)), dorisConfig, None)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]