[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-16 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20554


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r168626487
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -112,14 +112,18 @@ abstract class KafkaSourceTest extends StreamTest 
with SharedSQLContext {
 query.nonEmpty,
 "Cannot add data when there is no query for finding the active 
kafka source")
 
-  val sources = query.get.logicalPlan.collect {
-case StreamingExecutionRelation(source: KafkaSource, _) => source
-  } ++ (query.get.lastExecution match {
-case null => Seq()
-case e => e.logical.collect {
-  case DataSourceV2Relation(_, reader: KafkaContinuousReader) => 
reader
-}
-  })
+  val sources = {
+query.get.logicalPlan.collect {
+  case StreamingExecutionRelation(source: KafkaSource, _) => source
+  case StreamingExecutionRelation(source: KafkaMicroBatchReader, 
_) => source
+} ++ (query.get.lastExecution match {
+  case null => Seq()
+  case e => e.logical.collect {
+case DataSourceV2Relation(_, reader: KafkaContinuousReader) => 
reader
+  }
+})
+  }.distinct
--- End diff --

yes.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r168625863
  
--- Diff: 
external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-future-version.bin
 ---
@@ -0,0 +1,2 @@
+0v9
+{"kafka-initial-offset-future-version":{"2":2,"1":1,"0":0}}
--- End diff --

done


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-15 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r168625742
  
--- Diff: 
external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin
 ---
@@ -1 +1 @@
-2{"kafka-initial-offset-2-1-0":{"2":0,"1":0,"0":0}}
\ No newline at end of file
+2{"kafka-initial-offset-2-1-0":{"2":2,"1":1,"0":0}}
--- End diff --

I modified the to make the test "deserialization of initial offset written 
by Spark 2.1.0 " stronger. See the updated test. The way it goes now is that we 
start the query from earliest offset, and simultaneous have this initial 
offsets that are NOT at 0 offset. And we check that the query is reading the 
first offset as given in the initial offset and not the earliest available in 
the topic. Hence I am changing the file a little bit, the values not the format.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-15 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r168558972
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchSourceSuite.scala
 ---
@@ -112,14 +112,18 @@ abstract class KafkaSourceTest extends StreamTest 
with SharedSQLContext {
 query.nonEmpty,
 "Cannot add data when there is no query for finding the active 
kafka source")
 
-  val sources = query.get.logicalPlan.collect {
-case StreamingExecutionRelation(source: KafkaSource, _) => source
-  } ++ (query.get.lastExecution match {
-case null => Seq()
-case e => e.logical.collect {
-  case DataSourceV2Relation(_, reader: KafkaContinuousReader) => 
reader
-}
-  })
+  val sources = {
+query.get.logicalPlan.collect {
+  case StreamingExecutionRelation(source: KafkaSource, _) => source
+  case StreamingExecutionRelation(source: KafkaMicroBatchReader, 
_) => source
+} ++ (query.get.lastExecution match {
+  case null => Seq()
+  case e => e.logical.collect {
+case DataSourceV2Relation(_, reader: KafkaContinuousReader) => 
reader
+  }
+})
+  }.distinct
--- End diff --

Is the distinct for the self join test?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-15 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r168559060
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -303,94 +302,75 @@ class KafkaMicroBatchSourceSuite extends 
KafkaSourceSuiteBase {
 )
   }
 
-  testWithUninterruptibleThread(
--- End diff --

+1


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-15 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r168591005
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this 
source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + 
(available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 
5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is 
done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data 
lost is critical, the user
+ * must make sure all messages in a topic have been processed when 
deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka 
maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+kafkaOffsetReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+options: DataSourceOptions,
+metadataPath: String,
+startingOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends MicroBatchReader with Logging {
+
+  type PartitionOffsetMap = Map[TopicPartition, Long]
+
+  private var startPartitionOffsets: PartitionOffsetMap = _
+  private var endPartitionOffsets: PartitionOffsetMap = _
+
+  private val pollTimeoutMs = options.getLong(
+"kafkaConsumer.pollTimeoutMs",
+SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+  private val maxOffsetsPerTrigger =
+Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = 
getOrCreateInitialPartitionOffsets()
+
+  override def setOffsetRange(start: ju.Optional[Offset], end: 
ju.Optional[Offset]): Unit = {
+// Make sure initialPartitionOffsets is initialized
+initialPartitionOffsets
+
+startPartitionOffsets = Option(start.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+.getOrElse(initialPartitionOffsets)
+
+endPartitionOffsets = Option(end.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+

[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-15 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r168558562
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 ---
@@ -306,7 +307,7 @@ private[kafka010] class KafkaSource(
 kafkaReader.close()
   }
 
-  override def toString(): String = s"KafkaSource[$kafkaReader]"
+  override def toString(): String = s"KafkaSourceV1[$kafkaReader]"
--- End diff --

good catch


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-15 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167127585
  
--- Diff: 
external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin
 ---
@@ -1 +1 @@
-2{"kafka-initial-offset-2-1-0":{"2":0,"1":0,"0":0}}
\ No newline at end of file
+2{"kafka-initial-offset-2-1-0":{"2":2,"1":1,"0":0}}
--- End diff --

Why does this need to be modified? The point of this file IIUC is to ensure 
that compatibility is maintained with offsets logged in old versions, so I 
worry something's wrong if we need to update it.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r168366098
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -303,94 +302,75 @@ class KafkaMicroBatchSourceSuite extends 
KafkaSourceSuiteBase {
 )
   }
 
-  testWithUninterruptibleThread(
--- End diff --

Added 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-14 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r168120568
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this 
source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + 
(available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 
5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is 
done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data 
lost is critical, the user
+ * must make sure all messages in a topic have been processed when 
deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka 
maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+kafkaOffsetReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+options: DataSourceOptions,
+metadataPath: String,
+startingOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends MicroBatchReader with Logging {
+
+  type PartitionOffsetMap = Map[TopicPartition, Long]
+
+  private var startPartitionOffsets: PartitionOffsetMap = _
+  private var endPartitionOffsets: PartitionOffsetMap = _
+
+  private val pollTimeoutMs = options.getLong(
+"kafkaConsumer.pollTimeoutMs",
+SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+  private val maxOffsetsPerTrigger =
+Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = 
getOrCreateInitialPartitionOffsets()
+
+  override def setOffsetRange(start: ju.Optional[Offset], end: 
ju.Optional[Offset]): Unit = {
+// Make sure initialPartitionOffsets is initialized
+initialPartitionOffsets
+
+startPartitionOffsets = Option(start.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+.getOrElse(initialPartitionOffsets)
+
+endPartitionOffsets = Option(end.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+

[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-13 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167815276
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this 
source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + 
(available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 
5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is 
done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data 
lost is critical, the user
+ * must make sure all messages in a topic have been processed when 
deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka 
maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+kafkaOffsetReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+options: DataSourceOptions,
+metadataPath: String,
+startingOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends MicroBatchReader with Logging {
+
+  type PartitionOffsetMap = Map[TopicPartition, Long]
+
+  private var startPartitionOffsets: PartitionOffsetMap = _
+  private var endPartitionOffsets: PartitionOffsetMap = _
+
+  private val pollTimeoutMs = options.getLong(
+"kafkaConsumer.pollTimeoutMs",
+SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+  private val maxOffsetsPerTrigger =
+Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = 
getOrCreateInitialPartitionOffsets()
+
+  override def setOffsetRange(start: ju.Optional[Offset], end: 
ju.Optional[Offset]): Unit = {
+// Make sure initialPartitionOffsets is initialized
+initialPartitionOffsets
+
+startPartitionOffsets = Option(start.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+.getOrElse(initialPartitionOffsets)
+
+endPartitionOffsets = Option(end.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+

[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-13 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167811474
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -28,50 +28,40 @@ import 
org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySe
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, 
SparkSession, SQLContext}
-import org.apache.spark.sql.execution.streaming.{Sink, Source}
+import org.apache.spark.sql.execution.streaming.Sink
 import org.apache.spark.sql.sources._
-import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions, StreamWriteSupport}
+import org.apache.spark.sql.sources.v2.{ContinuousReadSupport, 
DataSourceOptions, MicroBatchReadSupport, StreamWriteSupport}
 import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
 import org.apache.spark.sql.streaming.OutputMode
 import org.apache.spark.sql.types.StructType
 
 /**
- * The provider class for the [[KafkaSource]]. This provider is designed 
such that it throws
+ * The provider class for all Kafka readers and writers. It is designed 
such that it throws
  * IllegalArgumentException when the Kafka Dataset is created, so that it 
can catch
  * missing options even before the query is started.
  */
 private[kafka010] class KafkaSourceProvider extends DataSourceRegister
-with StreamSourceProvider
 with StreamSinkProvider
 with RelationProvider
 with CreatableRelationProvider
 with StreamWriteSupport
 with ContinuousReadSupport
+with MicroBatchReadSupport
 with Logging {
   import KafkaSourceProvider._
 
   override def shortName(): String = "kafka"
 
   /**
-   * Returns the name and schema of the source. In addition, it also 
verifies whether the options
-   * are correct and sufficient to create the [[KafkaSource]] when the 
query is started.
+   * Creates a 
[[org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader]] to read 
batches
+   * of Kafka data in a micro-batch streaming query.
*/
-  override def sourceSchema(
-  sqlContext: SQLContext,
-  schema: Option[StructType],
-  providerName: String,
-  parameters: Map[String, String]): (String, StructType) = {
-validateStreamOptions(parameters)
-require(schema.isEmpty, "Kafka source has a fixed schema and cannot be 
set with a custom one")
-(shortName(), KafkaOffsetReader.kafkaSchema)
-  }
-
-  override def createSource(
-  sqlContext: SQLContext,
+  def createMicroBatchReader(
--- End diff --

nit: `override`


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-13 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167807584
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this 
source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + 
(available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 
5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is 
done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data 
lost is critical, the user
+ * must make sure all messages in a topic have been processed when 
deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka 
maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+kafkaOffsetReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+options: DataSourceOptions,
+metadataPath: String,
+startingOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends MicroBatchReader with Logging {
+
+  type PartitionOffsetMap = Map[TopicPartition, Long]
+
+  private var startPartitionOffsets: PartitionOffsetMap = _
+  private var endPartitionOffsets: PartitionOffsetMap = _
+
+  private val pollTimeoutMs = options.getLong(
+"kafkaConsumer.pollTimeoutMs",
+SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+  private val maxOffsetsPerTrigger =
+Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = 
getOrCreateInitialPartitionOffsets()
+
+  override def setOffsetRange(start: ju.Optional[Offset], end: 
ju.Optional[Offset]): Unit = {
+// Make sure initialPartitionOffsets is initialized
+initialPartitionOffsets
+
+startPartitionOffsets = Option(start.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+.getOrElse(initialPartitionOffsets)
+
+endPartitionOffsets = Option(end.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+

[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-13 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167809278
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -303,94 +302,75 @@ class KafkaMicroBatchSourceSuite extends 
KafkaSourceSuiteBase {
 )
   }
 
-  testWithUninterruptibleThread(
--- End diff --

Agreed. How about just writing a test to make sure we do write 0 at the 
beginning?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167124768
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -303,94 +302,75 @@ class KafkaMicroBatchSourceSuite extends 
KafkaSourceSuiteBase {
 )
   }
 
-  testWithUninterruptibleThread(
-"deserialization of initial offset with Spark 2.1.0") {
-withTempDir { metadataPath =>
-  val topic = newTopic
-  testUtils.createTopic(topic, partitions = 3)
-
-  val provider = new KafkaSourceProvider
-  val parameters = Map(
-"kafka.bootstrap.servers" -> testUtils.brokerAddress,
-"subscribe" -> topic
-  )
-  val source = provider.createSource(spark.sqlContext, 
metadataPath.getAbsolutePath, None,
-"", parameters)
-  source.getOffset.get // Write initial offset
-
-  // Make sure Spark 2.1.0 will throw an exception when reading the 
new log
-  intercept[java.lang.IllegalArgumentException] {
-// Simulate how Spark 2.1.0 reads the log
-Utils.tryWithResource(new 
FileInputStream(metadataPath.getAbsolutePath + "/0")) { in =>
-  val length = in.read()
-  val bytes = new Array[Byte](length)
-  in.read(bytes)
-  KafkaSourceOffset(SerializedOffset(new String(bytes, UTF_8)))
-}
-  }
-}
-  }
-
-  testWithUninterruptibleThread("deserialization of initial offset written 
by Spark 2.1.0") {
+  test("deserialization of initial offset written by Spark 2.1.0") {
 withTempDir { metadataPath =>
--- End diff --

Changed the two tests below to not use the source/reader directly (too 
low-level implementation dependent test) to actually run a streaming query 
using sample initial offset files in the `test/resources`.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167124564
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -303,94 +302,75 @@ class KafkaMicroBatchSourceSuite extends 
KafkaSourceSuiteBase {
 )
   }
 
-  testWithUninterruptibleThread(
--- End diff --

I think this test is superfluous and does not test anything useful. As with 
the other modified tests, "simulating" an implementation is a BAD test, and in 
this particular case it is attempting to simulate the 2.1.0 log, which is not 
necessary any more.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167124346
  
--- Diff: 
external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-future-version.bin
 ---
@@ -0,0 +1,2 @@
+0v9
+{"kafka-initial-offset-future-version":{"2":2,"1":1,"0":0}}
--- End diff --

note: should remove the newline to keep it consistent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167124308
  
--- Diff: 
external/kafka-0-10-sql/src/test/resources/kafka-source-initial-offset-version-2.1.0.bin
 ---
@@ -1 +1 @@
-2{"kafka-initial-offset-2-1-0":{"2":0,"1":0,"0":0}}
\ No newline at end of file
+2{"kafka-initial-offset-2-1-0":{"2":2,"1":1,"0":0}}
--- End diff --

note: should remove the newline to keep it consistent.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167123917
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this 
source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + 
(available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 
5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is 
done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data 
lost is critical, the user
+ * must make sure all messages in a topic have been processed when 
deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka 
maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+kafkaOffsetReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+options: DataSourceOptions,
+metadataPath: String,
+startingOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends MicroBatchReader with Logging {
+
+  type PartitionOffsetMap = Map[TopicPartition, Long]
+
+  private var startPartitionOffsets: PartitionOffsetMap = _
+  private var endPartitionOffsets: PartitionOffsetMap = _
+
+  private val pollTimeoutMs = options.getLong(
+"kafkaConsumer.pollTimeoutMs",
+SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+  private val maxOffsetsPerTrigger =
+Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = 
getOrCreateInitialPartitionOffsets()
+
+  override def setOffsetRange(start: ju.Optional[Offset], end: 
ju.Optional[Offset]): Unit = {
+// Make sure initialPartitionOffsets is initialized
+initialPartitionOffsets
+
+startPartitionOffsets = Option(start.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+.getOrElse(initialPartitionOffsets)
+
+endPartitionOffsets = Option(end.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+

[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167123837
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this 
source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + 
(available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 
5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is 
done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data 
lost is critical, the user
+ * must make sure all messages in a topic have been processed when 
deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka 
maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+kafkaOffsetReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+options: DataSourceOptions,
+metadataPath: String,
+startingOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends MicroBatchReader with Logging {
+
+  type PartitionOffsetMap = Map[TopicPartition, Long]
+
+  private var startPartitionOffsets: PartitionOffsetMap = _
+  private var endPartitionOffsets: PartitionOffsetMap = _
+
+  private val pollTimeoutMs = options.getLong(
+"kafkaConsumer.pollTimeoutMs",
+SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+  private val maxOffsetsPerTrigger =
+Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = 
getOrCreateInitialPartitionOffsets()
+
+  override def setOffsetRange(start: ju.Optional[Offset], end: 
ju.Optional[Offset]): Unit = {
+// Make sure initialPartitionOffsets is initialized
+initialPartitionOffsets
+
+startPartitionOffsets = Option(start.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+.getOrElse(initialPartitionOffsets)
+
+endPartitionOffsets = Option(end.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+

[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167123713
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this 
source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + 
(available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 
5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is 
done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data 
lost is critical, the user
+ * must make sure all messages in a topic have been processed when 
deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka 
maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+kafkaOffsetReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+options: DataSourceOptions,
+metadataPath: String,
+startingOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends MicroBatchReader with Logging {
+
+  type PartitionOffsetMap = Map[TopicPartition, Long]
+
+  private var startPartitionOffsets: PartitionOffsetMap = _
+  private var endPartitionOffsets: PartitionOffsetMap = _
+
+  private val pollTimeoutMs = options.getLong(
+"kafkaConsumer.pollTimeoutMs",
+SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+  private val maxOffsetsPerTrigger =
+Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = 
getOrCreateInitialPartitionOffsets()
+
+  override def setOffsetRange(start: ju.Optional[Offset], end: 
ju.Optional[Offset]): Unit = {
+// Make sure initialPartitionOffsets is initialized
+initialPartitionOffsets
+
+startPartitionOffsets = Option(start.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+.getOrElse(initialPartitionOffsets)
+
+endPartitionOffsets = Option(end.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+

[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167123614
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this 
source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + 
(available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 
5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is 
done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data 
lost is critical, the user
+ * must make sure all messages in a topic have been processed when 
deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka 
maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+kafkaOffsetReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+options: DataSourceOptions,
+metadataPath: String,
+startingOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends MicroBatchReader with Logging {
+
+  type PartitionOffsetMap = Map[TopicPartition, Long]
+
+  private var startPartitionOffsets: PartitionOffsetMap = _
+  private var endPartitionOffsets: PartitionOffsetMap = _
+
+  private val pollTimeoutMs = options.getLong(
+"kafkaConsumer.pollTimeoutMs",
+SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+  private val maxOffsetsPerTrigger =
+Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = 
getOrCreateInitialPartitionOffsets()
+
+  override def setOffsetRange(start: ju.Optional[Offset], end: 
ju.Optional[Offset]): Unit = {
+// Make sure initialPartitionOffsets is initialized
+initialPartitionOffsets
+
+startPartitionOffsets = Option(start.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+.getOrElse(initialPartitionOffsets)
+
+endPartitionOffsets = Option(end.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+

[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167123580
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this 
source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + 
(available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 
5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is 
done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data 
lost is critical, the user
+ * must make sure all messages in a topic have been processed when 
deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka 
maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+kafkaOffsetReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+options: DataSourceOptions,
+metadataPath: String,
+startingOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends MicroBatchReader with Logging {
+
+  type PartitionOffsetMap = Map[TopicPartition, Long]
+
+  private var startPartitionOffsets: PartitionOffsetMap = _
+  private var endPartitionOffsets: PartitionOffsetMap = _
+
+  private val pollTimeoutMs = options.getLong(
+"kafkaConsumer.pollTimeoutMs",
+SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+  private val maxOffsetsPerTrigger =
+Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = 
getOrCreateInitialPartitionOffsets()
+
+  override def setOffsetRange(start: ju.Optional[Offset], end: 
ju.Optional[Offset]): Unit = {
+// Make sure initialPartitionOffsets is initialized
+initialPartitionOffsets
+
+startPartitionOffsets = Option(start.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+.getOrElse(initialPartitionOffsets)
+
+endPartitionOffsets = Option(end.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+

[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167123513
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaMicroBatchReader.scala
 ---
@@ -0,0 +1,410 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.io._
+import java.nio.charset.StandardCharsets
+
+import scala.collection.JavaConverters._
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.SparkEnv
+import org.apache.spark.internal.Logging
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.DataSourceOptions
+import org.apache.spark.sql.sources.v2.reader.{DataReader, 
DataReaderFactory}
+import org.apache.spark.sql.sources.v2.reader.streaming.{MicroBatchReader, 
Offset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.util.UninterruptibleThread
+
+/**
+ * A [[MicroBatchReader]] that reads data from Kafka.
+ *
+ * The [[KafkaSourceOffset]] is the custom [[Offset]] defined for this 
source that contains
+ * a map of TopicPartition -> offset. Note that this offset is 1 + 
(available offset). For
+ * example if the last record in a Kafka topic "t", partition 2 is offset 
5, then
+ * KafkaSourceOffset will contain TopicPartition("t", 2) -> 6. This is 
done keep it consistent
+ * with the semantics of `KafkaConsumer.position()`.
+ *
+ * Zero data lost is not guaranteed when topics are deleted. If zero data 
lost is critical, the user
+ * must make sure all messages in a topic have been processed when 
deleting a topic.
+ *
+ * There is a known issue caused by KAFKA-1894: the query using Kafka 
maybe cannot be stopped.
+ * To avoid this issue, you should make sure stopping the query before 
stopping the Kafka brokers
+ * and not use wrong broker addresses.
+ */
+private[kafka010] class KafkaMicroBatchReader(
+kafkaOffsetReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+options: DataSourceOptions,
+metadataPath: String,
+startingOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends MicroBatchReader with Logging {
+
+  type PartitionOffsetMap = Map[TopicPartition, Long]
+
+  private var startPartitionOffsets: PartitionOffsetMap = _
+  private var endPartitionOffsets: PartitionOffsetMap = _
+
+  private val pollTimeoutMs = options.getLong(
+"kafkaConsumer.pollTimeoutMs",
+SparkEnv.get.conf.getTimeAsMs("spark.network.timeout", "120s"))
+
+  private val maxOffsetsPerTrigger =
+Option(options.get("maxOffsetsPerTrigger").orElse(null)).map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = 
getOrCreateInitialPartitionOffsets()
+
+  override def setOffsetRange(start: ju.Optional[Offset], end: 
ju.Optional[Offset]): Unit = {
+// Make sure initialPartitionOffsets is initialized
+initialPartitionOffsets
+
+startPartitionOffsets = Option(start.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+.getOrElse(initialPartitionOffsets)
+
+endPartitionOffsets = Option(end.orElse(null))
+.map(_.asInstanceOf[KafkaSourceOffset].partitionToOffsets)
+

[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20554#discussion_r167123199
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -408,8 +401,27 @@ private[kafka010] object KafkaSourceProvider extends 
Logging {
   private[kafka010] val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
   private[kafka010] val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
   private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
+
   val TOPIC_OPTION_KEY = "topic"
 
+  val INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE =
--- End diff --

Moved this from KafkaSource to this class because this is used by multiple 
reader classes and therefore should be present in the higher level class (e.g. 
the provider class).


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20554: [SPARK-23362][SS] Migrate Kafka Microbatch source...

2018-02-08 Thread tdas
GitHub user tdas opened a pull request:

https://github.com/apache/spark/pull/20554

[SPARK-23362][SS] Migrate Kafka Microbatch source to v2

## What changes were proposed in this pull request?
Migrating KafkaSource (with data source v1) to KafkaMicroBatchReader (with 
data source v2).

## How was this patch tested?
Existing tests, few modified to be better tests than the existing ones.

You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tdas/spark SPARK-23362

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20554.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20554


commit 3ed2a509276194214875f39e1e18d8093155c54c
Author: Tathagata Das 
Date:   2018-02-09T01:46:56Z

Migrated




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org