This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 705507f  [SPARK-27494][SS] Null values don't work in Kafka source v2
705507f is described below

commit 705507facda11060f1a0beb04d1dd19bda5fc4f3
Author: uncleGen <husty...@gmail.com>
AuthorDate: Fri Apr 26 14:25:31 2019 +0800

    [SPARK-27494][SS] Null values don't work in Kafka source v2
    
    ## What changes were proposed in this pull request?
    
    Right now Kafka source v2 doesn't support null values. The issue is in 
org.apache.spark.sql.kafka010.KafkaRecordToUnsafeRowConverter.toUnsafeRow which 
doesn't handle null values.
    
    ## How was this patch tested?
    
    add new unit tests
    
    Closes #24441 from uncleGen/SPARK-27494.
    
    Authored-by: uncleGen <husty...@gmail.com>
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
    (cherry picked from commit d2656aaecd4a7b5562d8d2065aaa66fdc72d253d)
    Signed-off-by: Wenchen Fan <wenc...@databricks.com>
---
 .../kafka010/KafkaRecordToUnsafeRowConverter.scala |  7 ++-
 .../sql/kafka010/KafkaContinuousSourceSuite.scala  |  4 ++
 .../sql/kafka010/KafkaMicroBatchSourceSuite.scala  | 58 ++++++++++++++++++++++
 3 files changed, 68 insertions(+), 1 deletion(-)

diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
index f35a143..306ef10 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRecordToUnsafeRowConverter.scala
@@ -30,13 +30,18 @@ private[kafka010] class KafkaRecordToUnsafeRowConverter {
 
   def toUnsafeRow(record: ConsumerRecord[Array[Byte], Array[Byte]]): UnsafeRow 
= {
     rowWriter.reset()
+    rowWriter.zeroOutNullBytes()
 
     if (record.key == null) {
       rowWriter.setNullAt(0)
     } else {
       rowWriter.write(0, record.key)
     }
-    rowWriter.write(1, record.value)
+    if (record.value == null) {
+      rowWriter.setNullAt(1)
+    } else {
+      rowWriter.write(1, record.value)
+    }
     rowWriter.write(2, UTF8String.fromString(record.topic))
     rowWriter.write(3, record.partition)
     rowWriter.write(4, record.offset)
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
index a0e5818..649cb72 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -169,6 +169,10 @@ class KafkaContinuousSourceSuite extends 
KafkaSourceSuiteBase with KafkaContinuo
       }
     }
   }
+
+  test("SPARK-27494: read kafka record containing null key/values.") {
+    testNullableKeyValue(ContinuousTrigger(100))
+  }
 }
 
 class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
index 34cf335..da92019 100644
--- 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
@@ -988,6 +988,10 @@ abstract class KafkaMicroBatchSourceSuiteBase extends 
KafkaSourceSuiteBase {
       q.stop()
     }
   }
+
+  test("SPARK-27494: read kafka record containing null key/values.") {
+    testNullableKeyValue(Trigger.ProcessingTime(100))
+  }
 }
 
 
@@ -1461,6 +1465,60 @@ abstract class KafkaSourceSuiteBase extends 
KafkaSourceTest {
       CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17)
     )
   }
+
+  protected def testNullableKeyValue(trigger: Trigger): Unit = {
+    val table = "kafka_null_key_value_source_test"
+    withTable(table) {
+      val topic = newTopic()
+      testUtils.createTopic(topic)
+      testUtils.withTranscationalProducer { producer =>
+        val df = spark
+          .readStream
+          .format("kafka")
+          .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+          .option("kafka.isolation.level", "read_committed")
+          .option("startingOffsets", "earliest")
+          .option("subscribe", topic)
+          .load()
+          .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+          .as[(String, String)]
+
+        val q = df
+          .writeStream
+          .format("memory")
+          .queryName(table)
+          .trigger(trigger)
+          .start()
+        try {
+          var idx = 0
+          producer.beginTransaction()
+          val expected1 = Seq.tabulate(5) { _ =>
+            producer.send(new ProducerRecord[String, String](topic, null, 
null)).get()
+            (null, null)
+          }.asInstanceOf[Seq[(String, String)]]
+
+          val expected2 = Seq.tabulate(5) { _ =>
+            idx += 1
+            producer.send(new ProducerRecord[String, String](topic, 
idx.toString, null)).get()
+            (idx.toString, null)
+          }.asInstanceOf[Seq[(String, String)]]
+
+          val expected3 = Seq.tabulate(5) { _ =>
+            idx += 1
+            producer.send(new ProducerRecord[String, String](topic, null, 
idx.toString)).get()
+            (null, idx.toString)
+          }.asInstanceOf[Seq[(String, String)]]
+
+          producer.commitTransaction()
+          eventually(timeout(streamingTimeout)) {
+            checkAnswer(spark.table(table), (expected1 ++ expected2 ++ 
expected3).toDF())
+          }
+        } finally {
+          q.stop()
+        }
+      }
+    }
+  }
 }
 
 object KafkaSourceSuite {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to