This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris-spark-connector.git


The following commit(s) were added to refs/heads/master by this push:
     new 37a6498  [Improve] add schema less option (#332)
37a6498 is described below

commit 37a64981e116bb7abcf53b454d883479b90a846e
Author: wudi <[email protected]>
AuthorDate: Wed Jul 30 19:36:50 2025 +0800

    [Improve] add schema less option (#332)
---
 .../apache/doris/spark/config/DorisOptions.java    |  2 ++
 .../doris/spark/util/LoadBalanceListTest.java      |  8 ++---
 .../doris/spark/sql/DorisAnySchemaITCase.scala     | 39 ++++++++++++++++++++++
 .../doris/spark/catalog/DorisTableBase.scala       |  4 ++-
 4 files changed, 48 insertions(+), 5 deletions(-)

diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
index 0be7acf..5600411 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/main/java/org/apache/doris/spark/config/DorisOptions.java
@@ -69,6 +69,8 @@ public class DorisOptions {
 
     public static final ConfigOption<String> DORIS_WRITE_FIELDS = 
ConfigOptions.name("doris.write.fields").stringType().withoutDefaultValue().withDescription("");
 
+    public static final ConfigOption<Boolean> DORIS_WRITE_SCHEMA_LESS = 
ConfigOptions.name("doris.write.schemaless").booleanType().defaultValue(false).withDescription("");
+
     public static final ConfigOption<Integer> DORIS_SINK_BATCH_SIZE = 
ConfigOptions.name("doris.sink.batch.size").intType().defaultValue(500000).withDescription("");
 
     public static final ConfigOption<Integer> DORIS_SINK_MAX_RETRIES = 
ConfigOptions.name("doris.sink.max-retries").intType().defaultValue(0).withDescription("");
diff --git 
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/util/LoadBalanceListTest.java
 
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/util/LoadBalanceListTest.java
index e4226e9..01f9303 100644
--- 
a/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/util/LoadBalanceListTest.java
+++ 
b/spark-doris-connector/spark-doris-connector-base/src/test/java/org/apache/doris/spark/util/LoadBalanceListTest.java
@@ -41,7 +41,7 @@ public class LoadBalanceListTest {
                                if (index++ == 0) {
                                        testHeadSet.add(server);
                                }
-                               System.out.println(server);
+                               // System.out.println(server);
                        }
                        if (i % serverList.size() == 0) {
                                
Assert.assertTrue(testList.equals(Arrays.asList("server1", "server2", 
"server3")));
@@ -55,7 +55,7 @@ public class LoadBalanceListTest {
                                
Assert.assertTrue(testList.equals(Arrays.asList("server3", "server1", 
"server2")));
                        }
 
-                       System.out.println("---------");
+                       // System.out.println("---------");
                        Assert.assertTrue(testList.size() == serverList.size());
                }
                Assert.assertTrue(testHeadSet.size() == serverList.size());
@@ -80,9 +80,9 @@ public class LoadBalanceListTest {
                                if (++index > loadBalanceList.getList().size() 
- failedSet.size()) {
                                        
Assert.assertTrue(failedSet.contains(server));
                                }
-                               System.out.println(server);
+                               // System.out.println(server);
                        }
-                       System.out.println("---------");
+                       // System.out.println("---------");
                        Assert.assertTrue(serverSet.size() == 
serverList.size());
                }
        }
diff --git 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisAnySchemaITCase.scala
 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisAnySchemaITCase.scala
index f516eaa..2d86f9e 100644
--- 
a/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisAnySchemaITCase.scala
+++ 
b/spark-doris-connector/spark-doris-connector-it/src/test/java/org/apache/doris/spark/sql/DorisAnySchemaITCase.scala
@@ -76,6 +76,8 @@ class DorisAnySchemaITCase extends AbstractContainerTestBase {
    */
   val dorisPartialTable = "table5"
 
+  val dorisSchemaLessTable = "table_schema_less"
+
   @Test
   def jsonDataWriteTest(): Unit = {
     initializeTable(dorisTable, DataModel.UNIQUE)
@@ -316,6 +318,43 @@ class DorisAnySchemaITCase extends 
AbstractContainerTestBase {
     }
   }
 
+
+  @Test
+  def jsonDataWriteWithSchemaLess(): Unit = {
+    initializeTable(dorisSchemaLessTable, DataModel.UNIQUE)
+    val spark = SparkSession.builder().master("local[*]").getOrCreate()
+    try {
+      val df = spark.createDataFrame(Seq(
+        (0, 0, 100),
+        (1, 0, 100),
+        (3, 0, 200)
+      )).toDF("siteid", "citycode", "pv")
+      df.write
+        .format("doris")
+        .option("doris.fenodes", getFenodes)
+        .option("doris.table.identifier", DATABASE + "." + 
dorisSchemaLessTable)
+        .option("user", getDorisUsername)
+        .option("password", getDorisPassword)
+        .option("doris.sink.properties.format", "json")
+        .option("doris.write.schemaless", "true")
+        .option("sink.batch.size", 2)
+        .option("sink.max-retries", 2)
+        .mode(SaveMode.Append)
+        .save()
+      spark.stop()
+      Thread.sleep(2000)
+      val actual = ContainerUtils.executeSQLStatement(
+        getDorisQueryConnection,
+        LOG,
+        String.format("select * from %s.%s", DATABASE, dorisSchemaLessTable),
+        4)
+      val expected = util.Arrays.asList("0,0,,100", "1,0,,100", "3,0,,200");
+      checkResultInAnyOrder("jsonDataWriteWithSchemaLess", expected.toArray, 
actual.toArray)
+    } finally {
+      spark.stop()
+    }
+  }
+
   private def initializeTable(table: String, dataModel: DataModel): Unit = {
     val morProps = if (!(DataModel.UNIQUE_MOR == dataModel)) "" else 
",\"enable_unique_key_merge_on_write\" = \"false\""
     val model = if (dataModel == DataModel.UNIQUE_MOR) 
DataModel.UNIQUE.toString else dataModel.toString
diff --git 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
index 90da702..2df410b 100644
--- 
a/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
+++ 
b/spark-doris-connector/spark-doris-connector-spark-3-base/src/main/scala/org/apache/doris/spark/catalog/DorisTableBase.scala
@@ -51,7 +51,9 @@ abstract class DorisTableBase(identifier: Identifier, config: 
DorisConfig, schem
       STREAMING_WRITE,
       TRUNCATE)
     val properties = config.getSinkProperties
-    if (properties.containsKey(DorisOptions.PARTIAL_COLUMNS) && 
"true".equalsIgnoreCase(properties.get(DorisOptions.PARTIAL_COLUMNS))) {
+    val partialColumnsEnabled = 
properties.containsKey(DorisOptions.PARTIAL_COLUMNS) && 
"true".equalsIgnoreCase(properties.get(DorisOptions.PARTIAL_COLUMNS))
+    val schemaLessEnabled = 
config.getValue(DorisOptions.DORIS_WRITE_SCHEMA_LESS)
+    if (partialColumnsEnabled || schemaLessEnabled) {
       capabilities += ACCEPT_ANY_SCHEMA
     }
     capabilities.asJava


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to