[SPARK-22908][SS] Roll forward continuous processing Kafka support with fix to 
continuous Kafka data reader

## What changes were proposed in this pull request?

The Kafka reader is now interruptible and can close itself.
## How was this patch tested?

I locally ran one of the ContinuousKafkaSourceSuite tests in a tight loop. 
Before the fix, my machine ran out of open file descriptors a few iterations 
in; now it works fine.

Author: Jose Torres <j...@databricks.com>

Closes #20253 from jose-torres/fix-data-reader.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/16670578
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/16670578
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/16670578

Branch: refs/heads/master
Commit: 16670578519a7b787b0c63888b7d2873af12d5b9
Parents: a9b845e
Author: Jose Torres <j...@databricks.com>
Authored: Tue Jan 16 18:11:27 2018 -0800
Committer: Tathagata Das <tathagata.das1...@gmail.com>
Committed: Tue Jan 16 18:11:27 2018 -0800

----------------------------------------------------------------------
 .../sql/kafka010/KafkaContinuousReader.scala    | 260 +++++++++
 .../sql/kafka010/KafkaContinuousWriter.scala    | 119 ++++
 .../spark/sql/kafka010/KafkaOffsetReader.scala  |  21 +-
 .../apache/spark/sql/kafka010/KafkaSource.scala |  17 +-
 .../spark/sql/kafka010/KafkaSourceOffset.scala  |   7 +-
 .../sql/kafka010/KafkaSourceProvider.scala      | 105 +++-
 .../spark/sql/kafka010/KafkaWriteTask.scala     |  71 ++-
 .../apache/spark/sql/kafka010/KafkaWriter.scala |   5 +-
 .../sql/kafka010/KafkaContinuousSinkSuite.scala | 476 ++++++++++++++++
 .../kafka010/KafkaContinuousSourceSuite.scala   |  96 ++++
 .../sql/kafka010/KafkaContinuousTest.scala      |  94 ++++
 .../spark/sql/kafka010/KafkaSourceSuite.scala   | 539 ++++++++++---------
 .../org/apache/spark/sql/DataFrameReader.scala  |  32 +-
 .../org/apache/spark/sql/DataFrameWriter.scala  |  25 +-
 .../datasources/v2/WriteToDataSourceV2.scala    |   8 +-
 .../execution/streaming/StreamExecution.scala   |  15 +-
 .../ContinuousDataSourceRDDIter.scala           |   4 +-
 .../continuous/ContinuousExecution.scala        |  67 ++-
 .../streaming/continuous/EpochCoordinator.scala |  21 +-
 .../spark/sql/streaming/DataStreamWriter.scala  |  26 +-
 .../apache/spark/sql/streaming/StreamTest.scala |  36 +-
 21 files changed, 1628 insertions(+), 416 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/16670578/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
new file mode 100644
index 0000000..fc97797
--- /dev/null
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
@@ -0,0 +1,260 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.TimeoutException
+
+import org.apache.kafka.clients.consumer.{ConsumerRecord, 
OffsetOutOfRangeException}
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.TaskContext
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.reader._
+import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * A [[ContinuousReader]] for data from kafka.
+ *
+ * @param offsetReader  a reader used to get kafka offsets. Note that the 
actual data will be
+ *                      read by per-task consumers generated later.
+ * @param kafkaParams   String params for per-task Kafka consumers.
+ * @param sourceOptions The 
[[org.apache.spark.sql.sources.v2.DataSourceV2Options]] params which
+ *                      are not Kafka consumer params.
+ * @param metadataPath Path to a directory this reader can use for writing 
metadata.
+ * @param initialOffsets The Kafka offsets to start reading data at.
+ * @param failOnDataLoss Flag indicating whether reading should fail in data 
loss
+ *                       scenarios, where some offsets after the specified 
initial ones can't be
+ *                       properly read.
+ */
+class KafkaContinuousReader(
+    offsetReader: KafkaOffsetReader,
+    kafkaParams: ju.Map[String, Object],
+    sourceOptions: Map[String, String],
+    metadataPath: String,
+    initialOffsets: KafkaOffsetRangeLimit,
+    failOnDataLoss: Boolean)
+  extends ContinuousReader with SupportsScanUnsafeRow with Logging {
+
+  private lazy val session = SparkSession.getActiveSession.get
+  private lazy val sc = session.sparkContext
+
+  private val pollTimeoutMs = 
sourceOptions.getOrElse("kafkaConsumer.pollTimeoutMs", "512").toLong
+
+  // Initialized when creating read tasks. If this diverges from the 
partitions at the latest
+  // offsets, we need to reconfigure.
+  // Exposed outside this object only for unit tests.
+  private[sql] var knownPartitions: Set[TopicPartition] = _
+
+  override def readSchema: StructType = KafkaOffsetReader.kafkaSchema
+
+  private var offset: Offset = _
+  override def setOffset(start: ju.Optional[Offset]): Unit = {
+    offset = start.orElse {
+      val offsets = initialOffsets match {
+        case EarliestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
+        case LatestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchLatestOffsets())
+        case SpecificOffsetRangeLimit(p) => 
offsetReader.fetchSpecificOffsets(p, reportDataLoss)
+      }
+      logInfo(s"Initial offsets: $offsets")
+      offsets
+    }
+  }
+
+  override def getStartOffset(): Offset = offset
+
+  override def deserializeOffset(json: String): Offset = {
+    KafkaSourceOffset(JsonUtils.partitionOffsets(json))
+  }
+
+  override def createUnsafeRowReadTasks(): ju.List[ReadTask[UnsafeRow]] = {
+    import scala.collection.JavaConverters._
+
+    val oldStartPartitionOffsets = 
KafkaSourceOffset.getPartitionOffsets(offset)
+
+    val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet
+    val newPartitions = 
currentPartitionSet.diff(oldStartPartitionOffsets.keySet)
+    val newPartitionOffsets = 
offsetReader.fetchEarliestOffsets(newPartitions.toSeq)
+
+    val deletedPartitions = 
oldStartPartitionOffsets.keySet.diff(currentPartitionSet)
+    if (deletedPartitions.nonEmpty) {
+      reportDataLoss(s"Some partitions were deleted: $deletedPartitions")
+    }
+
+    val startOffsets = newPartitionOffsets ++
+      oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_))
+    knownPartitions = startOffsets.keySet
+
+    startOffsets.toSeq.map {
+      case (topicPartition, start) =>
+        KafkaContinuousReadTask(
+          topicPartition, start, kafkaParams, pollTimeoutMs, failOnDataLoss)
+          .asInstanceOf[ReadTask[UnsafeRow]]
+    }.asJava
+  }
+
+  /** Stop this source and free any resources it has allocated. */
+  def stop(): Unit = synchronized {
+    offsetReader.close()
+  }
+
+  override def commit(end: Offset): Unit = {}
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+    val mergedMap = offsets.map {
+      case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
+    }.reduce(_ ++ _)
+    KafkaSourceOffset(mergedMap)
+  }
+
+  override def needsReconfiguration(): Boolean = {
+    knownPartitions != null && offsetReader.fetchLatestOffsets().keySet != 
knownPartitions
+  }
+
+  override def toString(): String = s"KafkaSource[$offsetReader]"
+
+  /**
+   * If `failOnDataLoss` is true, this method will throw an 
`IllegalStateException`.
+   * Otherwise, just log a warning.
+   */
+  private def reportDataLoss(message: String): Unit = {
+    if (failOnDataLoss) {
+      throw new IllegalStateException(message + s". 
$INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE")
+    } else {
+      logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE")
+    }
+  }
+}
+
+/**
+ * A read task for continuous Kafka processing. This will be serialized and 
transformed into a
+ * full reader on executors.
+ *
+ * @param topicPartition The (topic, partition) pair this task is responsible 
for.
+ * @param startOffset The offset to start reading from within the partition.
+ * @param kafkaParams Kafka consumer params to use.
+ * @param pollTimeoutMs The timeout for Kafka consumer polling.
+ * @param failOnDataLoss Flag indicating whether data reader should fail if 
some offsets
+ *                       are skipped.
+ */
+case class KafkaContinuousReadTask(
+    topicPartition: TopicPartition,
+    startOffset: Long,
+    kafkaParams: ju.Map[String, Object],
+    pollTimeoutMs: Long,
+    failOnDataLoss: Boolean) extends ReadTask[UnsafeRow] {
+  override def createDataReader(): KafkaContinuousDataReader = {
+    new KafkaContinuousDataReader(
+      topicPartition, startOffset, kafkaParams, pollTimeoutMs, failOnDataLoss)
+  }
+}
+
+/**
+ * A per-task data reader for continuous Kafka processing.
+ *
+ * @param topicPartition The (topic, partition) pair this data reader is 
responsible for.
+ * @param startOffset The offset to start reading from within the partition.
+ * @param kafkaParams Kafka consumer params to use.
+ * @param pollTimeoutMs The timeout for Kafka consumer polling.
+ * @param failOnDataLoss Flag indicating whether data reader should fail if 
some offsets
+ *                       are skipped.
+ */
+class KafkaContinuousDataReader(
+    topicPartition: TopicPartition,
+    startOffset: Long,
+    kafkaParams: ju.Map[String, Object],
+    pollTimeoutMs: Long,
+    failOnDataLoss: Boolean) extends ContinuousDataReader[UnsafeRow] {
+  private val topic = topicPartition.topic
+  private val kafkaPartition = topicPartition.partition
+  private val consumer = CachedKafkaConsumer.createUncached(topic, 
kafkaPartition, kafkaParams)
+
+  private val sharedRow = new UnsafeRow(7)
+  private val bufferHolder = new BufferHolder(sharedRow)
+  private val rowWriter = new UnsafeRowWriter(bufferHolder, 7)
+
+  private var nextKafkaOffset = startOffset
+  private var currentRecord: ConsumerRecord[Array[Byte], Array[Byte]] = _
+
+  override def next(): Boolean = {
+    var r: ConsumerRecord[Array[Byte], Array[Byte]] = null
+    while (r == null) {
+      if (TaskContext.get().isInterrupted() || 
TaskContext.get().isCompleted()) return false
+      // Our consumer.get is not interruptible, so we have to set a low poll 
timeout, leaving
+      // interrupt points to end the query rather than waiting for new data 
that might never come.
+      try {
+        r = consumer.get(
+          nextKafkaOffset,
+          untilOffset = Long.MaxValue,
+          pollTimeoutMs,
+          failOnDataLoss)
+      } catch {
+        // We didn't read within the timeout. We're supposed to block 
indefinitely for new data, so
+        // swallow and ignore this.
+        case _: TimeoutException =>
+
+        // This is a failOnDataLoss exception. Retry if nextKafkaOffset is 
within the data range,
+        // or if it's the endpoint of the data range (i.e. the "true" next 
offset).
+        case e: IllegalStateException  if 
e.getCause.isInstanceOf[OffsetOutOfRangeException] =>
+          val range = consumer.getAvailableOffsetRange()
+          if (range.latest >= nextKafkaOffset && range.earliest <= 
nextKafkaOffset) {
+            // retry
+          } else {
+            throw e
+          }
+      }
+    }
+    nextKafkaOffset = r.offset + 1
+    currentRecord = r
+    true
+  }
+
+  override def get(): UnsafeRow = {
+    bufferHolder.reset()
+
+    if (currentRecord.key == null) {
+      rowWriter.setNullAt(0)
+    } else {
+      rowWriter.write(0, currentRecord.key)
+    }
+    rowWriter.write(1, currentRecord.value)
+    rowWriter.write(2, UTF8String.fromString(currentRecord.topic))
+    rowWriter.write(3, currentRecord.partition)
+    rowWriter.write(4, currentRecord.offset)
+    rowWriter.write(5,
+      DateTimeUtils.fromJavaTimestamp(new 
java.sql.Timestamp(currentRecord.timestamp)))
+    rowWriter.write(6, currentRecord.timestampType.id)
+    sharedRow.setTotalSize(bufferHolder.totalSize)
+    sharedRow
+  }
+
+  override def getOffset(): KafkaSourcePartitionOffset = {
+    KafkaSourcePartitionOffset(topicPartition, nextKafkaOffset)
+  }
+
+  override def close(): Unit = {
+    consumer.close()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/16670578/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala
new file mode 100644
index 0000000..9843f46
--- /dev/null
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.clients.producer.{Callback, ProducerRecord, 
RecordMetadata}
+import scala.collection.JavaConverters._
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, 
UnsafeProjection}
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{kafkaParamsForProducer, 
TOPIC_OPTION_KEY}
+import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
+import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.{BinaryType, StringType, StructType}
+
+/**
+ * Dummy commit message. The DataSourceV2 framework requires a commit message 
implementation but we
+ * don't need to really send one.
+ */
+case object KafkaWriterCommitMessage extends WriterCommitMessage
+
+/**
+ * A [[ContinuousWriter]] for Kafka writing. Responsible for generating the 
writer factory.
+ * @param topic The topic this writer is responsible for. If None, topic will 
be inferred from
+ *              a `topic` field in the incoming data.
+ * @param producerParams Parameters for Kafka producers in each task.
+ * @param schema The schema of the input data.
+ */
+class KafkaContinuousWriter(
+    topic: Option[String], producerParams: Map[String, String], schema: 
StructType)
+  extends ContinuousWriter with SupportsWriteInternalRow {
+
+  validateQuery(schema.toAttributes, producerParams.toMap[String, 
Object].asJava, topic)
+
+  override def createInternalRowWriterFactory(): KafkaContinuousWriterFactory =
+    KafkaContinuousWriterFactory(topic, producerParams, schema)
+
+  override def commit(epochId: Long, messages: Array[WriterCommitMessage]): 
Unit = {}
+  override def abort(messages: Array[WriterCommitMessage]): Unit = {}
+}
+
+/**
+ * A [[DataWriterFactory]] for Kafka writing. Will be serialized and sent to 
executors to generate
+ * the per-task data writers.
+ * @param topic The topic that should be written to. If None, topic will be 
inferred from
+ *              a `topic` field in the incoming data.
+ * @param producerParams Parameters for Kafka producers in each task.
+ * @param schema The schema of the input data.
+ */
+case class KafkaContinuousWriterFactory(
+    topic: Option[String], producerParams: Map[String, String], schema: 
StructType)
+  extends DataWriterFactory[InternalRow] {
+
+  override def createDataWriter(partitionId: Int, attemptNumber: Int): 
DataWriter[InternalRow] = {
+    new KafkaContinuousDataWriter(topic, producerParams, schema.toAttributes)
+  }
+}
+
+/**
+ * A [[DataWriter]] for Kafka writing. One data writer will be created in each 
partition to
+ * process incoming rows.
+ *
+ * @param targetTopic The topic that this data writer is targeting. If None, 
topic will be inferred
+ *                    from a `topic` field in the incoming data.
+ * @param producerParams Parameters to use for the Kafka producer.
+ * @param inputSchema The attributes in the input data.
+ */
+class KafkaContinuousDataWriter(
+    targetTopic: Option[String], producerParams: Map[String, String], 
inputSchema: Seq[Attribute])
+  extends KafkaRowWriter(inputSchema, targetTopic) with 
DataWriter[InternalRow] {
+  import scala.collection.JavaConverters._
+
+  private lazy val producer = CachedKafkaProducer.getOrCreate(
+    new java.util.HashMap[String, Object](producerParams.asJava))
+
+  def write(row: InternalRow): Unit = {
+    checkForErrors()
+    sendRow(row, producer)
+  }
+
+  def commit(): WriterCommitMessage = {
+    // Send is asynchronous, but we can't commit until all rows are actually 
in Kafka.
+    // This requires flushing and then checking that no callbacks produced 
errors.
+    // We also check for errors before to fail as soon as possible - the check 
is cheap.
+    checkForErrors()
+    producer.flush()
+    checkForErrors()
+    KafkaWriterCommitMessage
+  }
+
+  def abort(): Unit = {}
+
+  def close(): Unit = {
+    checkForErrors()
+    if (producer != null) {
+      producer.flush()
+      checkForErrors()
+      CachedKafkaProducer.close(new java.util.HashMap[String, 
Object](producerParams.asJava))
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/16670578/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
index 3e65949..551641c 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
@@ -117,10 +117,14 @@ private[kafka010] class KafkaOffsetReader(
    * Resolves the specific offsets based on Kafka seek positions.
    * This method resolves offset value -1 to the latest and -2 to the
    * earliest Kafka seek position.
+   *
+   * @param partitionOffsets the specific offsets to resolve
+   * @param reportDataLoss callback to either report or log data loss 
depending on setting
    */
   def fetchSpecificOffsets(
-      partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] =
-    runUninterruptibly {
+      partitionOffsets: Map[TopicPartition, Long],
+      reportDataLoss: String => Unit): KafkaSourceOffset = {
+    val fetched = runUninterruptibly {
       withRetriesWithoutInterrupt {
         // Poll to get the latest assigned partitions
         consumer.poll(0)
@@ -145,6 +149,19 @@ private[kafka010] class KafkaOffsetReader(
       }
     }
 
+    partitionOffsets.foreach {
+      case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
+        off != KafkaOffsetRangeLimit.EARLIEST =>
+        if (fetched(tp) != off) {
+          reportDataLoss(
+            s"startingOffsets for $tp was $off but consumer reset to 
${fetched(tp)}")
+        }
+      case _ =>
+        // no real way to check that beginning or end is reasonable
+    }
+    KafkaSourceOffset(fetched)
+  }
+
   /**
    * Fetch the earliest offsets for the topic partitions that are indicated
    * in the [[ConsumerStrategy]].

http://git-wip-us.apache.org/repos/asf/spark/blob/16670578/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
index 864a92b..169a5d0 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
@@ -130,7 +130,7 @@ private[kafka010] class KafkaSource(
       val offsets = startingOffsets match {
         case EarliestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
         case LatestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
-        case SpecificOffsetRangeLimit(p) => fetchAndVerify(p)
+        case SpecificOffsetRangeLimit(p) => 
kafkaReader.fetchSpecificOffsets(p, reportDataLoss)
       }
       metadataLog.add(0, offsets)
       logInfo(s"Initial offsets: $offsets")
@@ -138,21 +138,6 @@ private[kafka010] class KafkaSource(
     }.partitionToOffsets
   }
 
-  private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = {
-    val result = kafkaReader.fetchSpecificOffsets(specificOffsets)
-    specificOffsets.foreach {
-      case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
-          off != KafkaOffsetRangeLimit.EARLIEST =>
-        if (result(tp) != off) {
-          reportDataLoss(
-            s"startingOffsets for $tp was $off but consumer reset to 
${result(tp)}")
-        }
-      case _ =>
-      // no real way to check that beginning or end is reasonable
-    }
-    KafkaSourceOffset(result)
-  }
-
   private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = None
 
   override def schema: StructType = KafkaOffsetReader.kafkaSchema

http://git-wip-us.apache.org/repos/asf/spark/blob/16670578/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
index b5da415..c82154c 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
@@ -20,17 +20,22 @@ package org.apache.spark.sql.kafka010
 import org.apache.kafka.common.TopicPartition
 
 import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+import org.apache.spark.sql.sources.v2.streaming.reader.{Offset => OffsetV2, 
PartitionOffset}
 
 /**
  * An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of 
subscribed topics and
  * their offsets.
  */
 private[kafka010]
-case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) 
extends Offset {
+case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) 
extends OffsetV2 {
 
   override val json = JsonUtils.partitionOffsets(partitionToOffsets)
 }
 
+private[kafka010]
+case class KafkaSourcePartitionOffset(topicPartition: TopicPartition, 
partitionOffset: Long)
+  extends PartitionOffset
+
 /** Companion object of the [[KafkaSourceOffset]] */
 private[kafka010] object KafkaSourceOffset {
 

http://git-wip-us.apache.org/repos/asf/spark/blob/16670578/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
index 3cb4d8c..3914370 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
@@ -18,7 +18,7 @@
 package org.apache.spark.sql.kafka010
 
 import java.{util => ju}
-import java.util.{Locale, UUID}
+import java.util.{Locale, Optional, UUID}
 
 import scala.collection.JavaConverters._
 
@@ -27,9 +27,12 @@ import org.apache.kafka.clients.producer.ProducerConfig
 import org.apache.kafka.common.serialization.{ByteArrayDeserializer, 
ByteArraySerializer}
 
 import org.apache.spark.internal.Logging
-import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, 
SQLContext}
-import org.apache.spark.sql.execution.streaming.{Sink, Source}
+import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, 
SparkSession, SQLContext}
+import org.apache.spark.sql.execution.streaming.{Offset, Sink, Source}
 import org.apache.spark.sql.sources._
+import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options}
+import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, 
ContinuousWriteSupport}
+import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 
@@ -43,6 +46,8 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
     with StreamSinkProvider
     with RelationProvider
     with CreatableRelationProvider
+    with ContinuousWriteSupport
+    with ContinuousReadSupport
     with Logging {
   import KafkaSourceProvider._
 
@@ -101,6 +106,43 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
       failOnDataLoss(caseInsensitiveParams))
   }
 
+  override def createContinuousReader(
+      schema: Optional[StructType],
+      metadataPath: String,
+      options: DataSourceV2Options): KafkaContinuousReader = {
+    val parameters = options.asMap().asScala.toMap
+    validateStreamOptions(parameters)
+    // Each running query should use its own group id. Otherwise, the query 
may be only assigned
+    // partial data since Kafka will assign partitions to multiple consumers 
having the same group
+    // id. Hence, we should generate a unique id for each query.
+    val uniqueGroupId = 
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
+
+    val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
+    val specifiedKafkaParams =
+      parameters
+        .keySet
+        .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
+        .map { k => k.drop(6).toString -> parameters(k) }
+        .toMap
+
+    val startingStreamOffsets = 
KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams,
+      STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit)
+
+    val kafkaOffsetReader = new KafkaOffsetReader(
+      strategy(caseInsensitiveParams),
+      kafkaParamsForDriver(specifiedKafkaParams),
+      parameters,
+      driverGroupIdPrefix = s"$uniqueGroupId-driver")
+
+    new KafkaContinuousReader(
+      kafkaOffsetReader,
+      kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
+      parameters,
+      metadataPath,
+      startingStreamOffsets,
+      failOnDataLoss(caseInsensitiveParams))
+  }
+
   /**
    * Returns a new base relation with the given parameters.
    *
@@ -181,26 +223,22 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
     }
   }
 
-  private def kafkaParamsForProducer(parameters: Map[String, String]): 
Map[String, String] = {
-    val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
-    if 
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}"))
 {
-      throw new IllegalArgumentException(
-        s"Kafka option '${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}' is not 
supported as keys "
-          + "are serialized with ByteArraySerializer.")
-    }
+  override def createContinuousWriter(
+      queryId: String,
+      schema: StructType,
+      mode: OutputMode,
+      options: DataSourceV2Options): Optional[ContinuousWriter] = {
+    import scala.collection.JavaConverters._
 
-    if 
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}"))
-    {
-      throw new IllegalArgumentException(
-        s"Kafka option '${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}' is 
not supported as "
-          + "value are serialized with ByteArraySerializer.")
-    }
-    parameters
-      .keySet
-      .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
-      .map { k => k.drop(6).toString -> parameters(k) }
-      .toMap + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> 
classOf[ByteArraySerializer].getName,
-        ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> 
classOf[ByteArraySerializer].getName)
+    val spark = SparkSession.getActiveSession.get
+    val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim)
+    // We convert the options argument from V2 -> Java map -> scala mutable -> 
scala immutable.
+    val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap)
+
+    KafkaWriter.validateQuery(
+      schema.toAttributes, new java.util.HashMap[String, 
Object](producerParams.asJava), topic)
+
+    Optional.of(new KafkaContinuousWriter(topic, producerParams, schema))
   }
 
   private def strategy(caseInsensitiveParams: Map[String, String]) =
@@ -450,4 +488,27 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
 
     def build(): ju.Map[String, Object] = map
   }
+
+  private[kafka010] def kafkaParamsForProducer(
+      parameters: Map[String, String]): Map[String, String] = {
+    val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
+    if 
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}"))
 {
+      throw new IllegalArgumentException(
+        s"Kafka option '${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}' is not 
supported as keys "
+          + "are serialized with ByteArraySerializer.")
+    }
+
+    if 
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}"))
+    {
+      throw new IllegalArgumentException(
+        s"Kafka option '${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}' is 
not supported as "
+          + "value are serialized with ByteArraySerializer.")
+    }
+    parameters
+      .keySet
+      .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
+      .map { k => k.drop(6).toString -> parameters(k) }
+      .toMap + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> 
classOf[ByteArraySerializer].getName,
+      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> 
classOf[ByteArraySerializer].getName)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/16670578/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
index 6fd333e..baa60fe 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
@@ -33,10 +33,8 @@ import org.apache.spark.sql.types.{BinaryType, StringType}
 private[kafka010] class KafkaWriteTask(
     producerConfiguration: ju.Map[String, Object],
     inputSchema: Seq[Attribute],
-    topic: Option[String]) {
+    topic: Option[String]) extends KafkaRowWriter(inputSchema, topic) {
   // used to synchronize with Kafka callbacks
-  @volatile private var failedWrite: Exception = null
-  private val projection = createProjection
   private var producer: KafkaProducer[Array[Byte], Array[Byte]] = _
 
   /**
@@ -46,23 +44,7 @@ private[kafka010] class KafkaWriteTask(
     producer = CachedKafkaProducer.getOrCreate(producerConfiguration)
     while (iterator.hasNext && failedWrite == null) {
       val currentRow = iterator.next()
-      val projectedRow = projection(currentRow)
-      val topic = projectedRow.getUTF8String(0)
-      val key = projectedRow.getBinary(1)
-      val value = projectedRow.getBinary(2)
-      if (topic == null) {
-        throw new NullPointerException(s"null topic present in the data. Use 
the " +
-        s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default 
topic.")
-      }
-      val record = new ProducerRecord[Array[Byte], 
Array[Byte]](topic.toString, key, value)
-      val callback = new Callback() {
-        override def onCompletion(recordMetadata: RecordMetadata, e: 
Exception): Unit = {
-          if (failedWrite == null && e != null) {
-            failedWrite = e
-          }
-        }
-      }
-      producer.send(record, callback)
+      sendRow(currentRow, producer)
     }
   }
 
@@ -74,8 +56,49 @@ private[kafka010] class KafkaWriteTask(
       producer = null
     }
   }
+}
+
+private[kafka010] abstract class KafkaRowWriter(
+    inputSchema: Seq[Attribute], topic: Option[String]) {
+
+  // used to synchronize with Kafka callbacks
+  @volatile protected var failedWrite: Exception = _
+  protected val projection = createProjection
+
+  private val callback = new Callback() {
+    override def onCompletion(recordMetadata: RecordMetadata, e: Exception): 
Unit = {
+      if (failedWrite == null && e != null) {
+        failedWrite = e
+      }
+    }
+  }
 
-  private def createProjection: UnsafeProjection = {
+  /**
+   * Send the specified row to the producer, with a callback that will save 
any exception
+   * to failedWrite. Note that send is asynchronous; subclasses must flush() 
their producer before
+   * assuming the row is in Kafka.
+   */
+  protected def sendRow(
+      row: InternalRow, producer: KafkaProducer[Array[Byte], Array[Byte]]): 
Unit = {
+    val projectedRow = projection(row)
+    val topic = projectedRow.getUTF8String(0)
+    val key = projectedRow.getBinary(1)
+    val value = projectedRow.getBinary(2)
+    if (topic == null) {
+      throw new NullPointerException(s"null topic present in the data. Use the 
" +
+        s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default 
topic.")
+    }
+    val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, 
key, value)
+    producer.send(record, callback)
+  }
+
+  protected def checkForErrors(): Unit = {
+    if (failedWrite != null) {
+      throw failedWrite
+    }
+  }
+
+  private def createProjection = {
     val topicExpression = topic.map(Literal(_)).orElse {
       inputSchema.find(_.name == KafkaWriter.TOPIC_ATTRIBUTE_NAME)
     }.getOrElse {
@@ -112,11 +135,5 @@ private[kafka010] class KafkaWriteTask(
       Seq(topicExpression, Cast(keyExpression, BinaryType),
         Cast(valueExpression, BinaryType)), inputSchema)
   }
-
-  private def checkForErrors(): Unit = {
-    if (failedWrite != null) {
-      throw failedWrite
-    }
-  }
 }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/16670578/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
index 5e9ae35..15cd448 100644
--- 
a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
+++ 
b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala
@@ -43,10 +43,9 @@ private[kafka010] object KafkaWriter extends Logging {
   override def toString: String = "KafkaWriter"
 
   def validateQuery(
-      queryExecution: QueryExecution,
+      schema: Seq[Attribute],
       kafkaParameters: ju.Map[String, Object],
       topic: Option[String] = None): Unit = {
-    val schema = queryExecution.analyzed.output
     schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse(
       if (topic.isEmpty) {
         throw new AnalysisException(s"topic option required when no " +
@@ -84,7 +83,7 @@ private[kafka010] object KafkaWriter extends Logging {
       kafkaParameters: ju.Map[String, Object],
       topic: Option[String] = None): Unit = {
     val schema = queryExecution.analyzed.output
-    validateQuery(queryExecution, kafkaParameters, topic)
+    validateQuery(schema, kafkaParameters, topic)
     queryExecution.toRdd.foreachPartition { iter =>
       val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic)
       Utils.tryWithSafeFinally(block = writeTask.execute(iter))(

http://git-wip-us.apache.org/repos/asf/spark/blob/16670578/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
new file mode 100644
index 0000000..8487a69
--- /dev/null
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
@@ -0,0 +1,476 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Locale
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.scalatest.time.SpanSugar._
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
SpecificInternalRow, UnsafeProjection}
+import org.apache.spark.sql.execution.streaming.MemoryStream
+import org.apache.spark.sql.streaming._
+import org.apache.spark.sql.types.{BinaryType, DataType}
+import org.apache.spark.util.Utils
+
+/**
+ * This is a temporary port of KafkaSinkSuite, since we do not yet have a V2 
memory stream.
+ * Once we have one, this will be changed to a specialization of 
KafkaSinkSuite and we won't have
+ * to duplicate all the code.
+ */
+class KafkaContinuousSinkSuite extends KafkaContinuousTest {
+  import testImplicits._
+
+  override val streamingTimeout = 30.seconds
+
+  override def beforeAll(): Unit = {
+    super.beforeAll()
+    testUtils = new KafkaTestUtils(
+      withBrokerProps = Map("auto.create.topics.enable" -> "false"))
+    testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+    if (testUtils != null) {
+      testUtils.teardown()
+      testUtils = null
+    }
+    super.afterAll()
+  }
+
+  test("streaming - write to kafka with topic field") {
+    val inputTopic = newTopic()
+    testUtils.createTopic(inputTopic, partitions = 1)
+
+    val input = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", inputTopic)
+      .option("startingOffsets", "earliest")
+      .load()
+
+    val topic = newTopic()
+    testUtils.createTopic(topic)
+
+    val writer = createKafkaWriter(
+      input.toDF(),
+      withTopic = None,
+      withOutputMode = Some(OutputMode.Append))(
+      withSelectExpr = s"'$topic' as topic", "value")
+
+    val reader = createKafkaReader(topic)
+      .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
+      .selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
+      .as[(Int, Int)]
+      .map(_._2)
+
+    try {
+      testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
+      eventually(timeout(streamingTimeout)) {
+        checkDatasetUnorderly(reader, 1, 2, 3, 4, 5)
+      }
+      testUtils.sendMessages(inputTopic, Array("6", "7", "8", "9", "10"))
+      eventually(timeout(streamingTimeout)) {
+        checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+      }
+    } finally {
+      writer.stop()
+    }
+  }
+
+  test("streaming - write w/o topic field, with topic option") {
+    val inputTopic = newTopic()
+    testUtils.createTopic(inputTopic, partitions = 1)
+
+    val input = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", inputTopic)
+      .option("startingOffsets", "earliest")
+      .load()
+
+    val topic = newTopic()
+    testUtils.createTopic(topic)
+
+    val writer = createKafkaWriter(
+      input.toDF(),
+      withTopic = Some(topic),
+      withOutputMode = Some(OutputMode.Append()))()
+
+    val reader = createKafkaReader(topic)
+      .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
+      .selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
+      .as[(Int, Int)]
+      .map(_._2)
+
+    try {
+      testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
+      eventually(timeout(streamingTimeout)) {
+        checkDatasetUnorderly(reader, 1, 2, 3, 4, 5)
+      }
+      testUtils.sendMessages(inputTopic, Array("6", "7", "8", "9", "10"))
+      eventually(timeout(streamingTimeout)) {
+        checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+      }
+    } finally {
+      writer.stop()
+    }
+  }
+
+  test("streaming - topic field and topic option") {
+    /* The purpose of this test is to ensure that the topic option
+     * overrides the topic field. We begin by writing some data that
+     * includes a topic field and value (e.g., 'foo') along with a topic
+     * option. Then when we read from the topic specified in the option
+     * we should see the data i.e., the data was written to the topic
+     * option, and not to the topic in the data e.g., foo
+     */
+    val inputTopic = newTopic()
+    testUtils.createTopic(inputTopic, partitions = 1)
+
+    val input = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", inputTopic)
+      .option("startingOffsets", "earliest")
+      .load()
+
+    val topic = newTopic()
+    testUtils.createTopic(topic)
+
+    val writer = createKafkaWriter(
+      input.toDF(),
+      withTopic = Some(topic),
+      withOutputMode = Some(OutputMode.Append()))(
+      withSelectExpr = "'foo' as topic", "CAST(value as STRING) value")
+
+    val reader = createKafkaReader(topic)
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .selectExpr("CAST(key AS INT)", "CAST(value AS INT)")
+      .as[(Int, Int)]
+      .map(_._2)
+
+    try {
+      testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
+      eventually(timeout(streamingTimeout)) {
+        checkDatasetUnorderly(reader, 1, 2, 3, 4, 5)
+      }
+      testUtils.sendMessages(inputTopic, Array("6", "7", "8", "9", "10"))
+      eventually(timeout(streamingTimeout)) {
+        checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+      }
+    } finally {
+      writer.stop()
+    }
+  }
+
+  test("null topic attribute") {
+    val inputTopic = newTopic()
+    testUtils.createTopic(inputTopic, partitions = 1)
+
+    val input = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", inputTopic)
+      .option("startingOffsets", "earliest")
+      .load()
+    val topic = newTopic()
+    testUtils.createTopic(topic)
+
+    /* No topic field or topic option */
+    var writer: StreamingQuery = null
+    var ex: Exception = null
+    try {
+      writer = createKafkaWriter(input.toDF())(
+        withSelectExpr = "CAST(null as STRING) as topic", "value"
+      )
+      testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
+      eventually(timeout(streamingTimeout)) {
+        assert(writer.exception.isDefined)
+        ex = writer.exception.get
+      }
+    } finally {
+      writer.stop()
+    }
+    assert(ex.getCause.getCause.getMessage
+      .toLowerCase(Locale.ROOT)
+      .contains("null topic present in the data."))
+  }
+
+  test("streaming - write data with bad schema") {
+    val inputTopic = newTopic()
+    testUtils.createTopic(inputTopic, partitions = 1)
+
+    val input = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", inputTopic)
+      .option("startingOffsets", "earliest")
+      .load()
+    val topic = newTopic()
+    testUtils.createTopic(topic)
+
+    /* No topic field or topic option */
+    var writer: StreamingQuery = null
+    var ex: Exception = null
+    try {
+      writer = createKafkaWriter(input.toDF())(
+        withSelectExpr = "value as key", "value"
+      )
+      testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
+      eventually(timeout(streamingTimeout)) {
+        assert(writer.exception.isDefined)
+        ex = writer.exception.get
+      }
+    } finally {
+      writer.stop()
+    }
+    assert(ex.getMessage
+      .toLowerCase(Locale.ROOT)
+      .contains("topic option required when no 'topic' attribute is present"))
+
+    try {
+      /* No value field */
+      writer = createKafkaWriter(input.toDF())(
+        withSelectExpr = s"'$topic' as topic", "value as key"
+      )
+      testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
+      eventually(timeout(streamingTimeout)) {
+        assert(writer.exception.isDefined)
+        ex = writer.exception.get
+      }
+    } finally {
+      writer.stop()
+    }
+    assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
+      "required attribute 'value' not found"))
+  }
+
+  test("streaming - write data with valid schema but wrong types") {
+    val inputTopic = newTopic()
+    testUtils.createTopic(inputTopic, partitions = 1)
+
+    val input = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", inputTopic)
+      .option("startingOffsets", "earliest")
+      .load()
+      .selectExpr("CAST(value as STRING) value")
+    val topic = newTopic()
+    testUtils.createTopic(topic)
+
+    var writer: StreamingQuery = null
+    var ex: Exception = null
+    try {
+      /* topic field wrong type */
+      writer = createKafkaWriter(input.toDF())(
+        withSelectExpr = s"CAST('1' as INT) as topic", "value"
+      )
+      testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
+      eventually(timeout(streamingTimeout)) {
+        assert(writer.exception.isDefined)
+        ex = writer.exception.get
+      }
+    } finally {
+      writer.stop()
+    }
+    assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("topic type must be 
a string"))
+
+    try {
+      /* value field wrong type */
+      writer = createKafkaWriter(input.toDF())(
+        withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as value"
+      )
+      testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
+      eventually(timeout(streamingTimeout)) {
+        assert(writer.exception.isDefined)
+        ex = writer.exception.get
+      }
+    } finally {
+      writer.stop()
+    }
+    assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
+      "value attribute type must be a string or binarytype"))
+
+    try {
+      /* key field wrong type */
+      writer = createKafkaWriter(input.toDF())(
+        withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as key", 
"value"
+      )
+      testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
+      eventually(timeout(streamingTimeout)) {
+        assert(writer.exception.isDefined)
+        ex = writer.exception.get
+      }
+    } finally {
+      writer.stop()
+    }
+    assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
+      "key attribute type must be a string or binarytype"))
+  }
+
+  test("streaming - write to non-existing topic") {
+    val inputTopic = newTopic()
+    testUtils.createTopic(inputTopic, partitions = 1)
+
+    val input = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", inputTopic)
+      .option("startingOffsets", "earliest")
+      .load()
+    val topic = newTopic()
+
+    var writer: StreamingQuery = null
+    var ex: Exception = null
+    try {
+      ex = intercept[StreamingQueryException] {
+        writer = createKafkaWriter(input.toDF(), withTopic = Some(topic))()
+        testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
+        eventually(timeout(streamingTimeout)) {
+          assert(writer.exception.isDefined)
+        }
+        throw writer.exception.get
+      }
+    } finally {
+      writer.stop()
+    }
+    assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("job aborted"))
+  }
+
+  test("streaming - exception on config serializer") {
+    val inputTopic = newTopic()
+    testUtils.createTopic(inputTopic, partitions = 1)
+    testUtils.sendMessages(inputTopic, Array("0"))
+
+    val input = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("subscribe", inputTopic)
+      .load()
+    var writer: StreamingQuery = null
+    var ex: Exception = null
+    try {
+      writer = createKafkaWriter(
+        input.toDF(),
+        withOptions = Map("kafka.key.serializer" -> "foo"))()
+      eventually(timeout(streamingTimeout)) {
+        assert(writer.exception.isDefined)
+        ex = writer.exception.get
+      }
+      assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
+        "kafka option 'key.serializer' is not supported"))
+    } finally {
+      writer.stop()
+    }
+
+    try {
+      writer = createKafkaWriter(
+        input.toDF(),
+        withOptions = Map("kafka.value.serializer" -> "foo"))()
+      eventually(timeout(streamingTimeout)) {
+        assert(writer.exception.isDefined)
+        ex = writer.exception.get
+      }
+      assert(ex.getMessage.toLowerCase(Locale.ROOT).contains(
+        "kafka option 'value.serializer' is not supported"))
+    } finally {
+      writer.stop()
+    }
+  }
+
+  test("generic - write big data with small producer buffer") {
+    /* This test ensures that we understand the semantics of Kafka when
+    * is comes to blocking on a call to send when the send buffer is full.
+    * This test will configure the smallest possible producer buffer and
+    * indicate that we should block when it is full. Thus, no exception should
+    * be thrown in the case of a full buffer.
+    */
+    val topic = newTopic()
+    testUtils.createTopic(topic, 1)
+    val options = new java.util.HashMap[String, String]
+    options.put("bootstrap.servers", testUtils.brokerAddress)
+    options.put("buffer.memory", "16384") // min buffer size
+    options.put("block.on.buffer.full", "true")
+    options.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
classOf[ByteArraySerializer].getName)
+    options.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
classOf[ByteArraySerializer].getName)
+    val inputSchema = Seq(AttributeReference("value", BinaryType)())
+    val data = new Array[Byte](15000) // large value
+    val writeTask = new KafkaContinuousDataWriter(Some(topic), 
options.asScala.toMap, inputSchema)
+    try {
+      val fieldTypes: Array[DataType] = Array(BinaryType)
+      val converter = UnsafeProjection.create(fieldTypes)
+      val row = new SpecificInternalRow(fieldTypes)
+      row.update(0, data)
+      val iter = Seq.fill(1000)(converter.apply(row)).iterator
+      iter.foreach(writeTask.write(_))
+      writeTask.commit()
+    } finally {
+      writeTask.close()
+    }
+  }
+
+  private def createKafkaReader(topic: String): DataFrame = {
+    spark.read
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("startingOffsets", "earliest")
+      .option("endingOffsets", "latest")
+      .option("subscribe", topic)
+      .load()
+  }
+
+  private def createKafkaWriter(
+      input: DataFrame,
+      withTopic: Option[String] = None,
+      withOutputMode: Option[OutputMode] = None,
+      withOptions: Map[String, String] = Map[String, String]())
+      (withSelectExpr: String*): StreamingQuery = {
+    var stream: DataStreamWriter[Row] = null
+    val checkpointDir = Utils.createTempDir()
+    var df = input.toDF()
+    if (withSelectExpr.length > 0) {
+      df = df.selectExpr(withSelectExpr: _*)
+    }
+    stream = df.writeStream
+      .format("kafka")
+      .option("checkpointLocation", checkpointDir.getCanonicalPath)
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      // We need to reduce blocking time to efficiently test non-existent 
partition behavior.
+      .option("kafka.max.block.ms", "1000")
+      .trigger(Trigger.Continuous(1000))
+      .queryName("kafkaStream")
+    withTopic.foreach(stream.option("topic", _))
+    withOutputMode.foreach(stream.outputMode(_))
+    withOptions.foreach(opt => stream.option(opt._1, opt._2))
+    stream.start()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/16670578/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
new file mode 100644
index 0000000..b3dade4
--- /dev/null
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.scalatest.time.SpanSugar._
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+// Run tests in KafkaSourceSuiteBase in continuous execution mode.
+class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with 
KafkaContinuousTest
+
+class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest {
+  import testImplicits._
+
+  override val brokerProps = Map("auto.create.topics.enable" -> "false")
+
+  test("subscribing topic by pattern with topic deletions") {
+    val topicPrefix = newTopic()
+    val topic = topicPrefix + "-seems"
+    val topic2 = topicPrefix + "-bad"
+    testUtils.createTopic(topic, partitions = 5)
+    testUtils.sendMessages(topic, Array("-1"))
+    require(testUtils.getLatestOffsets(Set(topic)).size === 5)
+
+    val reader = spark
+      .readStream
+      .format("kafka")
+      .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+      .option("kafka.metadata.max.age.ms", "1")
+      .option("subscribePattern", s"$topicPrefix-.*")
+      .option("failOnDataLoss", "false")
+
+    val kafka = reader.load()
+      .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
+      .as[(String, String)]
+    val mapped = kafka.map(kv => kv._2.toInt + 1)
+
+    testStream(mapped)(
+      makeSureGetOffsetCalled,
+      AddKafkaData(Set(topic), 1, 2, 3),
+      CheckAnswer(2, 3, 4),
+      Execute { query =>
+        testUtils.deleteTopic(topic)
+        testUtils.createTopic(topic2, partitions = 5)
+        eventually(timeout(streamingTimeout)) {
+          assert(
+            query.lastExecution.logical.collectFirst {
+              case DataSourceV2Relation(_, r: KafkaContinuousReader) => r
+            }.exists { r =>
+              // Ensure the new topic is present and the old topic is gone.
+              r.knownPartitions.exists(_.topic == topic2)
+            },
+            s"query never reconfigured to new topic $topic2")
+        }
+      },
+      AddKafkaData(Set(topic2), 4, 5, 6),
+      CheckAnswer(2, 3, 4, 5, 6, 7)
+    )
+  }
+}
+
+class KafkaContinuousSourceStressForDontFailOnDataLossSuite
+    extends KafkaSourceStressForDontFailOnDataLossSuite {
+  override protected def startStream(ds: Dataset[Int]) = {
+    ds.writeStream
+      .format("memory")
+      .queryName("memory")
+      .start()
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/16670578/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
----------------------------------------------------------------------
diff --git 
a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
new file mode 100644
index 0000000..5a1a14f
--- /dev/null
+++ 
b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd, 
SparkListenerTaskStart}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.sql.test.TestSparkSession
+
+// Trait to configure StreamTest for kafka continuous execution tests.
+trait KafkaContinuousTest extends KafkaSourceTest {
+  override val defaultTrigger = Trigger.Continuous(1000)
+  override val defaultUseV2Sink = true
+
+  // We need more than the default local[2] to be able to schedule all 
partitions simultaneously.
+  override protected def createSparkSession = new TestSparkSession(
+    new SparkContext(
+      "local[10]",
+      "continuous-stream-test-sql-context",
+      sparkConf.set("spark.sql.testkey", "true")))
+
+  // In addition to setting the partitions in Kafka, we have to wait until the 
query has
+  // reconfigured to the new count so the test framework can hook in properly.
+  override protected def setTopicPartitions(
+      topic: String, newCount: Int, query: StreamExecution) = {
+    testUtils.addPartitions(topic, newCount)
+    eventually(timeout(streamingTimeout)) {
+      assert(
+        query.lastExecution.logical.collectFirst {
+          case DataSourceV2Relation(_, r: KafkaContinuousReader) => r
+        }.exists(_.knownPartitions.size == newCount),
+        s"query never reconfigured to $newCount partitions")
+    }
+  }
+
+  // Continuous processing tasks end asynchronously, so test that they 
actually end.
+  private val tasksEndedListener = new SparkListener() {
+    val activeTaskIdCount = new AtomicInteger(0)
+
+    override def onTaskStart(start: SparkListenerTaskStart): Unit = {
+      activeTaskIdCount.incrementAndGet()
+    }
+
+    override def onTaskEnd(end: SparkListenerTaskEnd): Unit = {
+      activeTaskIdCount.decrementAndGet()
+    }
+  }
+
+  override def beforeEach(): Unit = {
+    super.beforeEach()
+    spark.sparkContext.addSparkListener(tasksEndedListener)
+  }
+
+  override def afterEach(): Unit = {
+    eventually(timeout(streamingTimeout)) {
+      assert(tasksEndedListener.activeTaskIdCount.get() == 0)
+    }
+    spark.sparkContext.removeSparkListener(tasksEndedListener)
+    super.afterEach()
+  }
+
+
+  test("ensure continuous stream is being used") {
+    val query = spark.readStream
+      .format("rate")
+      .option("numPartitions", "1")
+      .option("rowsPerSecond", "1")
+      .load()
+
+    testStream(query)(
+      Execute(q => assert(q.isInstanceOf[ContinuousExecution]))
+    )
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to