Copilot commented on code in PR #365:
URL: 
https://github.com/apache/doris-spark-connector/pull/365#discussion_r3464124228


##########
spark-doris-connector/spark-doris-connector-spark-4-base/src/main/scala/org/apache/doris/spark/write/DorisWrite.scala:
##########
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.write
+
+import org.apache.doris.spark.client.write.{CopyIntoProcessor, DorisCommitter, 
StreamLoadProcessor}
+import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
+import 
org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, 
StreamingWrite}
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, 
PhysicalWriteInfo, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+class DorisWrite(config: DorisConfig, schema: StructType) extends BatchWrite 
with StreamingWrite {
+
+  private val LOG = LoggerFactory.getLogger(classOf[DorisWrite])
+
+  private val committer: DorisCommitter = 
config.getValue(DorisOptions.LOAD_MODE) match {
+    case "stream_load" => new StreamLoadProcessor(config, schema)
+    case "copy_into" => new CopyIntoProcessor(config, schema)
+    case _ => throw new IllegalArgumentException()
+  }
+
+  private var lastCommittedEpoch: Option[Long] = None
+
+  private val committedEpochLock = new AnyRef
+
+  override def createBatchWriterFactory(physicalWriteInfo: PhysicalWriteInfo): 
DataWriterFactory = {
+    new DorisDataWriterFactory(config, schema)
+  }
+
+  // for batch write
+  override def commit(writerCommitMessages: Array[WriterCommitMessage]): Unit 
= {
+    if (writerCommitMessages != null && writerCommitMessages.nonEmpty) {
+      writerCommitMessages.filter(_ != null)
+        
.foreach(_.asInstanceOf[DorisWriterCommitMessage].commitMessages.foreach(committer.commit))
+    }
+  }
+
+  // for batch write
+  override def abort(writerCommitMessages: Array[WriterCommitMessage]): Unit = 
{
+    LOG.info("writerCommitMessages size: " + writerCommitMessages.length)
+    if (writerCommitMessages.exists(_ != null) && 
writerCommitMessages.nonEmpty) {
+      writerCommitMessages.filter(_ != null)
+        
.foreach(_.asInstanceOf[DorisWriterCommitMessage].commitMessages.foreach(committer.abort))
+    }
+  }

Review Comment:
   `abort(writerCommitMessages)` logs `writerCommitMessages.length` before 
checking for null, which can throw a NPE (the `commit` path already guards 
against null). Also, the implementation should consistently filter out null 
elements before casting.



##########
spark-doris-connector/build.sh:
##########
@@ -207,6 +217,22 @@ if [[ $SPARK_VERSION =~ ^3.* && $SCALA_VERSION == 
"2.11.12" ]]; then
   exit 1
 fi
 
+if [[ $SPARK_VERSION =~ ^4.* && $SCALA_VERSION != "2.13.16" ]]; then
+  echo_r "Spark 4.x requires scala 2.13, will exit."
+  exit 1
+fi
+
+if [[ ! $SPARK_VERSION =~ ^4.* && $SCALA_VERSION == "2.13.16" ]]; then
+  echo_r "Scala 2.13 is only supported with Spark 4.x, will exit."
+  exit 1
+fi

Review Comment:
   The Spark 4.x / Scala check compares against the exact Scala patch version 
(2.13.16), which no longer matches the Scala version configured for Spark 4.1 
in the pom (2.13.17). This will incorrectly reject a valid Spark 4.x build 
selection.



##########
spark-doris-connector/build.sh:
##########
@@ -177,6 +183,8 @@ if [ ${ScalaVer} -eq 1 ]; then
     SCALA_VERSION="2.11.12"
 elif [ ${ScalaVer} -eq 2 ]; then
     SCALA_VERSION="2.12.18"
+elif [ ${ScalaVer} -eq 3 ]; then
+    SCALA_VERSION="2.13.16"
 fi

Review Comment:
   `build.sh` hard-codes Scala 2.13.16 for the new Scala 2.13 option, but the 
Spark 4.1 profile/pom uses Scala 2.13.17. This makes the build script 
inconsistent with the Maven build (and can lead to confusing dependency 
resolution differences).



##########
spark-doris-connector/spark-doris-connector-spark-4-base/src/main/scala/org/apache/doris/spark/write/DorisWrite.scala:
##########
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.write
+
+import org.apache.doris.spark.client.write.{CopyIntoProcessor, DorisCommitter, 
StreamLoadProcessor}
+import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
+import 
org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, 
StreamingWrite}
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, 
PhysicalWriteInfo, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+class DorisWrite(config: DorisConfig, schema: StructType) extends BatchWrite 
with StreamingWrite {
+
+  private val LOG = LoggerFactory.getLogger(classOf[DorisWrite])
+
+  private val committer: DorisCommitter = 
config.getValue(DorisOptions.LOAD_MODE) match {
+    case "stream_load" => new StreamLoadProcessor(config, schema)
+    case "copy_into" => new CopyIntoProcessor(config, schema)
+    case _ => throw new IllegalArgumentException()
+  }
+
+  private var lastCommittedEpoch: Option[Long] = None
+
+  private val committedEpochLock = new AnyRef
+
+  override def createBatchWriterFactory(physicalWriteInfo: PhysicalWriteInfo): 
DataWriterFactory = {
+    new DorisDataWriterFactory(config, schema)
+  }
+
+  // for batch write
+  override def commit(writerCommitMessages: Array[WriterCommitMessage]): Unit 
= {
+    if (writerCommitMessages != null && writerCommitMessages.nonEmpty) {
+      writerCommitMessages.filter(_ != null)
+        
.foreach(_.asInstanceOf[DorisWriterCommitMessage].commitMessages.foreach(committer.commit))
+    }
+  }
+
+  // for batch write
+  override def abort(writerCommitMessages: Array[WriterCommitMessage]): Unit = 
{
+    LOG.info("writerCommitMessages size: " + writerCommitMessages.length)
+    if (writerCommitMessages.exists(_ != null) && 
writerCommitMessages.nonEmpty) {
+      writerCommitMessages.filter(_ != null)
+        
.foreach(_.asInstanceOf[DorisWriterCommitMessage].commitMessages.foreach(committer.abort))
+    }
+  }
+
+  override def useCommitCoordinator(): Boolean = true
+
+  override def createStreamingWriterFactory(physicalWriteInfo: 
PhysicalWriteInfo): StreamingDataWriterFactory = {
+    new DorisDataWriterFactory(config, schema)
+  }
+
+  // for streaming write
+  override def commit(epochId: Long, writerCommitMessages: 
Array[WriterCommitMessage]): Unit = {
+    committedEpochLock.synchronized {
+      if (lastCommittedEpoch.isEmpty || epochId > lastCommittedEpoch.get && 
writerCommitMessages.exists(_ != null)) {
+        
writerCommitMessages.foreach(_.asInstanceOf[DorisWriterCommitMessage].commitMessages.foreach(committer.commit))
+        lastCommittedEpoch = Some(epochId)
+      }
+    }
+  }
+
+  // for streaming write
+  override def abort(epochId: Long, writerCommitMessages: 
Array[WriterCommitMessage]): Unit = {
+    committedEpochLock.synchronized {
+      if ((lastCommittedEpoch.isEmpty || epochId > lastCommittedEpoch.get) && 
writerCommitMessages.exists(_ != null)) {
+        
writerCommitMessages.foreach(_.asInstanceOf[DorisWriterCommitMessage].commitMessages.foreach(committer.abort))
+      }
+    }
+  }

Review Comment:
   Streaming `abort` checks `writerCommitMessages.exists(_ != null)` but then 
iterates the full array without filtering nulls. If Spark includes null slots 
(common for partitions that didn't write), this will throw when casting/using a 
null element.



##########
spark-doris-connector/spark-doris-connector-spark-4-base/src/main/scala/org/apache/doris/spark/write/DorisWrite.scala:
##########
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.spark.write
+
+import org.apache.doris.spark.client.write.{CopyIntoProcessor, DorisCommitter, 
StreamLoadProcessor}
+import org.apache.doris.spark.config.{DorisConfig, DorisOptions}
+import 
org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, 
StreamingWrite}
+import org.apache.spark.sql.connector.write.{BatchWrite, DataWriterFactory, 
PhysicalWriteInfo, WriterCommitMessage}
+import org.apache.spark.sql.types.StructType
+import org.slf4j.LoggerFactory
+
+class DorisWrite(config: DorisConfig, schema: StructType) extends BatchWrite 
with StreamingWrite {
+
+  private val LOG = LoggerFactory.getLogger(classOf[DorisWrite])
+
+  private val committer: DorisCommitter = 
config.getValue(DorisOptions.LOAD_MODE) match {
+    case "stream_load" => new StreamLoadProcessor(config, schema)
+    case "copy_into" => new CopyIntoProcessor(config, schema)
+    case _ => throw new IllegalArgumentException()
+  }
+
+  private var lastCommittedEpoch: Option[Long] = None
+
+  private val committedEpochLock = new AnyRef
+
+  override def createBatchWriterFactory(physicalWriteInfo: PhysicalWriteInfo): 
DataWriterFactory = {
+    new DorisDataWriterFactory(config, schema)
+  }
+
+  // for batch write
+  override def commit(writerCommitMessages: Array[WriterCommitMessage]): Unit 
= {
+    if (writerCommitMessages != null && writerCommitMessages.nonEmpty) {
+      writerCommitMessages.filter(_ != null)
+        
.foreach(_.asInstanceOf[DorisWriterCommitMessage].commitMessages.foreach(committer.commit))
+    }
+  }
+
+  // for batch write
+  override def abort(writerCommitMessages: Array[WriterCommitMessage]): Unit = 
{
+    LOG.info("writerCommitMessages size: " + writerCommitMessages.length)
+    if (writerCommitMessages.exists(_ != null) && 
writerCommitMessages.nonEmpty) {
+      writerCommitMessages.filter(_ != null)
+        
.foreach(_.asInstanceOf[DorisWriterCommitMessage].commitMessages.foreach(committer.abort))
+    }
+  }
+
+  override def useCommitCoordinator(): Boolean = true
+
+  override def createStreamingWriterFactory(physicalWriteInfo: 
PhysicalWriteInfo): StreamingDataWriterFactory = {
+    new DorisDataWriterFactory(config, schema)
+  }
+
+  // for streaming write
+  override def commit(epochId: Long, writerCommitMessages: 
Array[WriterCommitMessage]): Unit = {
+    committedEpochLock.synchronized {
+      if (lastCommittedEpoch.isEmpty || epochId > lastCommittedEpoch.get && 
writerCommitMessages.exists(_ != null)) {
+        
writerCommitMessages.foreach(_.asInstanceOf[DorisWriterCommitMessage].commitMessages.foreach(committer.commit))
+        lastCommittedEpoch = Some(epochId)
+      }
+    }
+  }

Review Comment:
   In streaming `commit`, operator precedence makes the 
`writerCommitMessages.exists(_ != null)` guard apply only when 
`lastCommittedEpoch` is defined. When `lastCommittedEpoch` is empty (first 
epoch), this will always enter the block and can NPE if `writerCommitMessages` 
is null (or if it contains nulls, since the foreach doesn't filter).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to