[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-11 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/20096


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-10 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160774506
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
 ---
@@ -39,6 +39,15 @@ private[continuous] sealed trait EpochCoordinatorMessage 
extends Serializable
  */
 private[sql] case object IncrementAndGetEpoch extends 
EpochCoordinatorMessage
 
+/**
--- End diff --

looks good to me


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-10 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160640668
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala
 ---
@@ -39,6 +39,15 @@ private[continuous] sealed trait EpochCoordinatorMessage 
extends Serializable
  */
 private[sql] case object IncrementAndGetEpoch extends 
EpochCoordinatorMessage
 
+/**
--- End diff --

@zsxwing Can you take a look at these changes in this file. 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-09 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160570055
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -49,28 +52,37 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSQLContext {
 
   override val streamingTimeout = 30.seconds
 
+  protected val brokerProps = Map[String, Object]()
+
   override def beforeAll(): Unit = {
 super.beforeAll()
-testUtils = new KafkaTestUtils
+testUtils = new KafkaTestUtils(brokerProps)
 testUtils.setup()
   }
 
   override def afterAll(): Unit = {
 if (testUtils != null) {
   testUtils.teardown()
   testUtils = null
-  super.afterAll()
 }
+super.afterAll()
   }
 
   protected def makeSureGetOffsetCalled = AssertOnQuery { q =>
 // Because KafkaSource's initialPartitionOffsets is set lazily, we 
need to make sure
-// its "getOffset" is called before pushing any data. Otherwise, 
because of the race contion,
+// its "getOffset" is called before pushing any data. Otherwise, 
because of the race contOOion,
--- End diff --

I remember wondering this morning why my command-O key sequence wasn't 
working... I guess this is where it went.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-09 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160569877
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -237,85 +378,67 @@ class KafkaSourceSuite extends KafkaSourceTest {
 }
   }
 
-  test("(de)serialization of initial offsets") {
+  test("KafkaSource with watermark") {
+val now = System.currentTimeMillis()
 val topic = newTopic()
-testUtils.createTopic(topic, partitions = 64)
+testUtils.createTopic(newTopic(), partitions = 1)
+testUtils.sendMessages(topic, Array(1).map(_.toString))
 
-val reader = spark
+val kafka = spark
   .readStream
   .format("kafka")
   .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("startingOffsets", s"earliest")
   .option("subscribe", topic)
+  .load()
 
-testStream(reader.load)(
-  makeSureGetOffsetCalled,
-  StopStream,
-  StartStream(),
-  StopStream)
+val windowedAggregation = kafka
+  .withWatermark("timestamp", "10 seconds")
+  .groupBy(window($"timestamp", "5 seconds") as 'window)
+  .agg(count("*") as 'count)
+  .select($"window".getField("start") as 'window, $"count")
+
+val query = windowedAggregation
+  .writeStream
+  .format("memory")
+  .outputMode("complete")
+  .queryName("kafkaWatermark")
+  .start()
+query.processAllAvailable()
+val rows = spark.table("kafkaWatermark").collect()
+assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
+val row = rows(0)
+// We cannot check the exact window start time as it depands on the 
time that messages were
+// inserted by the producer. So here we just use a low bound to make 
sure the internal
+// conversion works.
+assert(
+  row.getAs[java.sql.Timestamp]("window").getTime >= now - 5 * 1000,
+  s"Unexpected results: $row")
+assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row")
+query.stop()
   }
+}
 
-  test("maxOffsetsPerTrigger") {
+class KafkaSourceSuiteBase extends KafkaSourceTest {
+
+  import testImplicits._
+
+  test("(de)serialization of initial offsets") {
 val topic = newTopic()
-testUtils.createTopic(topic, partitions = 3)
-testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, 
Some(0))
-testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, 
Some(1))
-testUtils.sendMessages(topic, Array("1"), Some(2))
+testUtils.createTopic(topic, partitions = 5)
 
 val reader = spark
   .readStream
   .format("kafka")
   .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-  .option("kafka.metadata.max.age.ms", "1")
-  .option("maxOffsetsPerTrigger", 10)
   .option("subscribe", topic)
-  .option("startingOffsets", "earliest")
-val kafka = reader.load()
-  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-  .as[(String, String)]
-val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => 
kv._2.toInt)
-
-val clock = new StreamManualClock
-
-val waitUntilBatchProcessed = AssertOnQuery { q =>
-  eventually(Timeout(streamingTimeout)) {
-if (!q.exception.isDefined) {
-  assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
-}
-  }
-  if (q.exception.isDefined) {
-throw q.exception.get
-  }
-  true
-}
 
-testStream(mapped)(
-  StartStream(ProcessingTime(100), clock),
-  waitUntilBatchProcessed,
-  // 1 from smallest, 1 from middle, 8 from biggest
-  CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
-  AdvanceManualClock(100),
-  waitUntilBatchProcessed,
-  // smallest now empty, 1 more from middle, 9 more from biggest
-  CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
-11, 108, 109, 110, 111, 112, 113, 114, 115, 116
-  ),
+testStream(reader.load)(
+  makeSureGetOffsetCalled,
   StopStream,
-  StartStream(ProcessingTime(100), clock),
-  waitUntilBatchProcessed,
-  // smallest now empty, 1 more from middle, 9 more from biggest
-  CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
-11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
-12, 117, 118, 119, 120, 121, 122, 123, 124, 125
-  ),
-  AdvanceManualClock(100),
-  waitUntilBatchProcessed,
-  // 

[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-09 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160569556
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -977,20 +971,8 @@ class KafkaSourceStressForDontFailOnDataLossSuite 
extends StreamTest with Shared
 }
   }
 
-  test("stress test for failOnDataLoss=false") {
-val reader = spark
-  .readStream
-  .format("kafka")
-  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-  .option("kafka.metadata.max.age.ms", "1")
-  .option("subscribePattern", "failOnDataLoss.*")
-  .option("startingOffsets", "earliest")
-  .option("failOnDataLoss", "false")
-  .option("fetchOffset.retryIntervalMs", "3000")
-val kafka = reader.load()
-  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-  .as[(String, String)]
-val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new 
ForeachWriter[Int] {
+  protected def startStream(ds: Dataset[Int]) = {
--- End diff --

startStream is overridden in the continuous version of this test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160552554
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSuite.scala
 ---
@@ -0,0 +1,135 @@
+/*
--- End diff --

Rename this file to KafkaContinuousSourceSuite


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160550486
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -237,85 +378,67 @@ class KafkaSourceSuite extends KafkaSourceTest {
 }
   }
 
-  test("(de)serialization of initial offsets") {
+  test("KafkaSource with watermark") {
+val now = System.currentTimeMillis()
 val topic = newTopic()
-testUtils.createTopic(topic, partitions = 64)
+testUtils.createTopic(newTopic(), partitions = 1)
+testUtils.sendMessages(topic, Array(1).map(_.toString))
 
-val reader = spark
+val kafka = spark
   .readStream
   .format("kafka")
   .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("startingOffsets", s"earliest")
   .option("subscribe", topic)
+  .load()
 
-testStream(reader.load)(
-  makeSureGetOffsetCalled,
-  StopStream,
-  StartStream(),
-  StopStream)
+val windowedAggregation = kafka
+  .withWatermark("timestamp", "10 seconds")
+  .groupBy(window($"timestamp", "5 seconds") as 'window)
+  .agg(count("*") as 'count)
+  .select($"window".getField("start") as 'window, $"count")
+
+val query = windowedAggregation
+  .writeStream
+  .format("memory")
+  .outputMode("complete")
+  .queryName("kafkaWatermark")
+  .start()
+query.processAllAvailable()
+val rows = spark.table("kafkaWatermark").collect()
+assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
+val row = rows(0)
+// We cannot check the exact window start time as it depands on the 
time that messages were
+// inserted by the producer. So here we just use a low bound to make 
sure the internal
+// conversion works.
+assert(
+  row.getAs[java.sql.Timestamp]("window").getTime >= now - 5 * 1000,
+  s"Unexpected results: $row")
+assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row")
+query.stop()
   }
+}
 
-  test("maxOffsetsPerTrigger") {
+class KafkaSourceSuiteBase extends KafkaSourceTest {
+
+  import testImplicits._
+
+  test("(de)serialization of initial offsets") {
 val topic = newTopic()
-testUtils.createTopic(topic, partitions = 3)
-testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, 
Some(0))
-testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, 
Some(1))
-testUtils.sendMessages(topic, Array("1"), Some(2))
+testUtils.createTopic(topic, partitions = 5)
 
 val reader = spark
   .readStream
   .format("kafka")
   .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-  .option("kafka.metadata.max.age.ms", "1")
-  .option("maxOffsetsPerTrigger", 10)
   .option("subscribe", topic)
-  .option("startingOffsets", "earliest")
-val kafka = reader.load()
-  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-  .as[(String, String)]
-val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => 
kv._2.toInt)
-
-val clock = new StreamManualClock
-
-val waitUntilBatchProcessed = AssertOnQuery { q =>
-  eventually(Timeout(streamingTimeout)) {
-if (!q.exception.isDefined) {
-  assert(clock.isStreamWaitingAt(clock.getTimeMillis()))
-}
-  }
-  if (q.exception.isDefined) {
-throw q.exception.get
-  }
-  true
-}
 
-testStream(mapped)(
-  StartStream(ProcessingTime(100), clock),
-  waitUntilBatchProcessed,
-  // 1 from smallest, 1 from middle, 8 from biggest
-  CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107),
-  AdvanceManualClock(100),
-  waitUntilBatchProcessed,
-  // smallest now empty, 1 more from middle, 9 more from biggest
-  CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
-11, 108, 109, 110, 111, 112, 113, 114, 115, 116
-  ),
+testStream(reader.load)(
+  makeSureGetOffsetCalled,
   StopStream,
-  StartStream(ProcessingTime(100), clock),
-  waitUntilBatchProcessed,
-  // smallest now empty, 1 more from middle, 9 more from biggest
-  CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107,
-11, 108, 109, 110, 111, 112, 113, 114, 115, 116,
-12, 117, 118, 119, 120, 121, 122, 123, 124, 125
-  ),
-  AdvanceManualClock(100),
-  waitUntilBatchProcessed,
-  // smallest now 

[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160546427
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSuite.scala
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.scalatest.time.SpanSugar._
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+trait KafkaContinuousTest extends KafkaSourceTest {
--- End diff --

Add docs to explain what this class if for.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160559942
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala
 ---
@@ -0,0 +1,348 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Locale
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.kafka.clients.producer.ProducerConfig
+import org.apache.kafka.common.serialization.ByteArraySerializer
+import org.scalatest.time.SpanSugar._
+import scala.collection.JavaConverters._
+
+import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
+import org.apache.spark.sql.catalyst.expressions.{AttributeReference, 
SpecificInternalRow, UnsafeProjection}
+import org.apache.spark.sql.streaming._
+import org.apache.spark.sql.types.{BinaryType, DataType}
+import org.apache.spark.util.Utils
+
+/**
+ * This is a temporary port of KafkaSinkSuite, since we do not yet have a 
V2 memory stream.
+ * Once we have one, this will be changed to a specialization of 
KafkaSinkSuite and we won't have
+ * to duplicate all the code.
+ */
+class KafkaContinuousSinkSuite extends KafkaContinuousTest {
+  import testImplicits._
+
+  override val streamingTimeout = 30.seconds
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils(
+  withBrokerProps = Map("auto.create.topics.enable" -> "false"))
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+}
+super.afterAll()
+  }
+
+  test("streaming - write to kafka with topic field") {
+val inputTopic = newTopic()
+testUtils.createTopic(inputTopic, partitions = 1)
+
+val input = spark
+  .readStream
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("subscribe", inputTopic)
+  .option("startingOffsets", "earliest")
+  .load()
+
+val topic = newTopic()
+testUtils.createTopic(topic)
+
+val writer = createKafkaWriter(
+  input.toDF(),
+  withTopic = None,
+  withOutputMode = Some(OutputMode.Append))(
+  withSelectExpr = s"'$topic' as topic", "value")
+
+val reader = createKafkaReader(topic)
+  .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value")
+  .selectExpr("CAST(key as INT) key", "CAST(value as INT) value")
+  .as[(Int, Int)]
+  .map(_._2)
+
+try {
+  testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5"))
+  failAfter(streamingTimeout) {
+writer.processAllAvailable()
+  }
+  checkDatasetUnorderly(reader, 1, 2, 3, 4, 5)
+  testUtils.sendMessages(inputTopic, Array("6", "7", "8", "9", "10"))
+  failAfter(streamingTimeout) {
+writer.processAllAvailable()
+  }
+  checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
+} finally {
+  writer.stop()
+}
+  }
+
+  test("streaming - write data with bad schema") {
--- End diff --

missing tests for ."w/o topic field, with topic option" and "topic field 
and topic option". 
and also test for the case when topic field is null.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160546593
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSuite.scala
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.scalatest.time.SpanSugar._
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+trait KafkaContinuousTest extends KafkaSourceTest {
+  override val defaultTrigger = Trigger.Continuous(1000)
+  override val defaultUseV2Sink = true
+
+  // We need more than the default local[2] to be able to schedule all 
partitions simultaneously.
+  override protected def createSparkSession = new TestSparkSession(
+new SparkContext(
+  "local[10]",
+  "continuous-stream-test-sql-context",
+  sparkConf.set("spark.sql.testkey", "true")))
+
+  // In addition to setting the partitions in Kafka, we have to wait until 
the query has
+  // reconfigured to the new count so the test framework can hook in 
properly.
+  override protected def setTopicPartitions(
+  topic: String, newCount: Int, query: StreamExecution) = {
+testUtils.addPartitions(topic, newCount)
+eventually(timeout(streamingTimeout)) {
+  assert(
+query.lastExecution.logical.collectFirst {
+  case DataSourceV2Relation(_, r: KafkaContinuousReader) => r
+}.exists(_.knownPartitions.size == newCount),
+s"query never reconfigured to $newCount partitions")
+}
+  }
+
+  test("ensure continuous stream is being used") {
+val query = spark.readStream
+  .format("rate")
+  .option("numPartitions", "1")
+  .option("rowsPerSecond", "1")
+  .load()
+
+testStream(query)(
+  Execute(q => assert(q.isInstanceOf[ContinuousExecution]))
+)
+  }
+}
+
+class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with 
KafkaContinuousTest {
--- End diff --

The `{  }` may not be needed.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160546455
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSuite.scala
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.scalatest.time.SpanSugar._
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+trait KafkaContinuousTest extends KafkaSourceTest {
+  override val defaultTrigger = Trigger.Continuous(1000)
+  override val defaultUseV2Sink = true
+
+  // We need more than the default local[2] to be able to schedule all 
partitions simultaneously.
+  override protected def createSparkSession = new TestSparkSession(
+new SparkContext(
+  "local[10]",
+  "continuous-stream-test-sql-context",
+  sparkConf.set("spark.sql.testkey", "true")))
+
+  // In addition to setting the partitions in Kafka, we have to wait until 
the query has
+  // reconfigured to the new count so the test framework can hook in 
properly.
+  override protected def setTopicPartitions(
+  topic: String, newCount: Int, query: StreamExecution) = {
+testUtils.addPartitions(topic, newCount)
+eventually(timeout(streamingTimeout)) {
+  assert(
+query.lastExecution.logical.collectFirst {
+  case DataSourceV2Relation(_, r: KafkaContinuousReader) => r
+}.exists(_.knownPartitions.size == newCount),
+s"query never reconfigured to $newCount partitions")
+}
+  }
+
+  test("ensure continuous stream is being used") {
+val query = spark.readStream
+  .format("rate")
+  .option("numPartitions", "1")
+  .option("rowsPerSecond", "1")
+  .load()
+
+testStream(query)(
+  Execute(q => assert(q.isInstanceOf[ContinuousExecution]))
+)
+  }
+}
+
+class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with 
KafkaContinuousTest {
--- End diff --

Add docs.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160550392
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -237,85 +378,67 @@ class KafkaSourceSuite extends KafkaSourceTest {
 }
   }
 
-  test("(de)serialization of initial offsets") {
+  test("KafkaSource with watermark") {
+val now = System.currentTimeMillis()
 val topic = newTopic()
-testUtils.createTopic(topic, partitions = 64)
+testUtils.createTopic(newTopic(), partitions = 1)
+testUtils.sendMessages(topic, Array(1).map(_.toString))
 
-val reader = spark
+val kafka = spark
   .readStream
   .format("kafka")
   .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("kafka.metadata.max.age.ms", "1")
+  .option("startingOffsets", s"earliest")
   .option("subscribe", topic)
+  .load()
 
-testStream(reader.load)(
-  makeSureGetOffsetCalled,
-  StopStream,
-  StartStream(),
-  StopStream)
+val windowedAggregation = kafka
+  .withWatermark("timestamp", "10 seconds")
+  .groupBy(window($"timestamp", "5 seconds") as 'window)
+  .agg(count("*") as 'count)
+  .select($"window".getField("start") as 'window, $"count")
+
+val query = windowedAggregation
+  .writeStream
+  .format("memory")
+  .outputMode("complete")
+  .queryName("kafkaWatermark")
+  .start()
+query.processAllAvailable()
+val rows = spark.table("kafkaWatermark").collect()
+assert(rows.length === 1, s"Unexpected results: ${rows.toList}")
+val row = rows(0)
+// We cannot check the exact window start time as it depands on the 
time that messages were
+// inserted by the producer. So here we just use a low bound to make 
sure the internal
+// conversion works.
+assert(
+  row.getAs[java.sql.Timestamp]("window").getTime >= now - 5 * 1000,
+  s"Unexpected results: $row")
+assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row")
+query.stop()
   }
+}
 
-  test("maxOffsetsPerTrigger") {
+class KafkaSourceSuiteBase extends KafkaSourceTest {
+
+  import testImplicits._
+
+  test("(de)serialization of initial offsets") {
--- End diff --

Is this needed in the common KafkaSourceSuiteBase?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160549136
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -49,28 +52,37 @@ abstract class KafkaSourceTest extends StreamTest with 
SharedSQLContext {
 
   override val streamingTimeout = 30.seconds
 
+  protected val brokerProps = Map[String, Object]()
+
   override def beforeAll(): Unit = {
 super.beforeAll()
-testUtils = new KafkaTestUtils
+testUtils = new KafkaTestUtils(brokerProps)
 testUtils.setup()
   }
 
   override def afterAll(): Unit = {
 if (testUtils != null) {
   testUtils.teardown()
   testUtils = null
-  super.afterAll()
 }
+super.afterAll()
   }
 
   protected def makeSureGetOffsetCalled = AssertOnQuery { q =>
 // Because KafkaSource's initialPartitionOffsets is set lazily, we 
need to make sure
-// its "getOffset" is called before pushing any data. Otherwise, 
because of the race contion,
+// its "getOffset" is called before pushing any data. Otherwise, 
because of the race contOOion,
--- End diff --

spelling mistake?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160516280
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSuite.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.scalatest.time.SpanSugar._
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+trait KafkaContinuousTest extends KafkaSourceTest {
+  override val defaultTrigger = Trigger.Continuous(1000)
+  override val defaultUseV2Sink = true
+
+  // We need more than the default local[2] to be able to schedule all 
partitions simultaneously.
+  override protected def createSparkSession = new TestSparkSession(
+new SparkContext(
+  "local[10]",
+  "continuous-stream-test-sql-context",
+  sparkConf.set("spark.sql.testkey", "true")))
+
+  override protected def setTopicPartitions(
--- End diff --

Add comment on what this method does. It is asserting something, so does 
not look like it only "sets" something.
  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160552160
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -977,20 +971,8 @@ class KafkaSourceStressForDontFailOnDataLossSuite 
extends StreamTest with Shared
 }
   }
 
-  test("stress test for failOnDataLoss=false") {
-val reader = spark
-  .readStream
-  .format("kafka")
-  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
-  .option("kafka.metadata.max.age.ms", "1")
-  .option("subscribePattern", "failOnDataLoss.*")
-  .option("startingOffsets", "earliest")
-  .option("failOnDataLoss", "false")
-  .option("fetchOffset.retryIntervalMs", "3000")
-val kafka = reader.load()
-  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
-  .as[(String, String)]
-val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new 
ForeachWriter[Int] {
+  protected def startStream(ds: Dataset[Int]) = {
--- End diff --

i think this factoring is not needed. `startStream()` is not used anywhere 
else other than in this test. So i dont see a point of refactoring it to define 
it outside the test.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160552695
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSuite.scala
 ---
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.scalatest.time.SpanSugar._
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+trait KafkaContinuousTest extends KafkaSourceTest {
--- End diff --

Also since this is used not just by the source, but also the sink, better 
to define this in a different file.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-09 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160516177
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSuite.scala
 ---
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.scalatest.time.SpanSugar._
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import org.apache.spark.sql.execution.streaming.StreamExecution
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.streaming.{StreamTest, Trigger}
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+trait KafkaContinuousTest extends KafkaSourceTest {
+  override val defaultTrigger = Trigger.Continuous(1000)
+  override val defaultUseV2Sink = true
+
+  // We need more than the default local[2] to be able to schedule all 
partitions simultaneously.
+  override protected def createSparkSession = new TestSparkSession(
+new SparkContext(
+  "local[10]",
+  "continuous-stream-test-sql-context",
+  sparkConf.set("spark.sql.testkey", "true")))
+
+  override protected def setTopicPartitions(
--- End diff --

Add comment on what this method does. It is asserting something, so does 
not look like it only "sets" something.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160009573
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
 ---
@@ -19,7 +19,8 @@ package org.apache.spark.sql.kafka010
 
 import org.apache.kafka.common.TopicPartition
 
-import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
+import org.apache.spark.sql.execution.streaming.{Offset => OffsetV1, 
SerializedOffset}
--- End diff --

Ummm.. i think its better to rename new Offset to OffsetV2 than this rename 
old one to OffsetV1. This will keep it more consistent with other APIs which 
have V2 in them. Also, the MicroBatchExecution in the other PR also uses 
OffsetV2. Sorry for the nothing think ahead on this.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160004815
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * A [[ContinuousReader]] for data from kafka.
+ *
+ * @param offsetReader  a reader used to get kafka offsets. Note that the 
actual data will be
+ *  read by per-task consumers generated later.
+ * @param kafkaParams   String params for per-task Kafka consumers.
+ * @param sourceOptions The 
[[org.apache.spark.sql.sources.v2.DataSourceV2Options]] params which
+ *  are not Kafka consumer params.
+ * @param metadataPath Path to a directory this reader can use for writing 
metadata.
+ * @param initialOffsets The Kafka offsets to start reading data at.
+ * @param failOnDataLoss Flag indicating whether reading should fail in 
data loss
+ *   scenarios, where some offsets after the specified 
initial ones can't be
+ *   properly read.
+ */
+class KafkaContinuousReader(
+offsetReader: KafkaOffsetReader,
+kafkaParams: java.util.Map[String, Object],
--- End diff --

Since there are lots of different uses of java.util.* ... you can probably 
rename java.util to ju. Thats what the file KafkaSourceProvider class does.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160007884
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -418,11 +418,16 @@ abstract class StreamExecution(
* Blocks the current thread until processing for data from the given 
`source` has reached at
* least the given `Offset`. This method is intended for use primarily 
when writing tests.
*/
-  private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = {
+  private[sql] def awaitOffset(sourceIndex: Int, newOffset: Offset): Unit 
= {
 assertAwaitThread()
 def notDone = {
   val localCommittedOffsets = committedOffsets
-  !localCommittedOffsets.contains(source) || 
localCommittedOffsets(source) != newOffset
+  if (sources.length <= sourceIndex) {
+false
--- End diff --

The race condition is present because `sources` is initialized to Seq.empty 
and then assigned to the actual sources. You can actually initialize `sources` 
to null, and then return `notDone = false` when `sources` is null. Any other 
mismatch should throw error. I dont like this current code which hides 
erroneous situations.
  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160006676
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala
 ---
@@ -0,0 +1,113 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.clients.producer.{Callback, ProducerRecord, 
RecordMetadata}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection}
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{kafkaParamsForProducer, 
TOPIC_OPTION_KEY}
+import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.{BinaryType, StringType, StructType}
+
+/**
+ * Dummy commit message. The DataSourceV2 framework requires a commit 
message implementation but we
+ * don't need to really send one.
+ */
+case object KafkaWriterCommitMessage extends WriterCommitMessage
+
+/**
+ * A [[ContinuousWriter]] for Kafka writing. Responsible for generating 
the writer factory.
+ * @param topic The topic this writer is responsible for. If None, topic 
will be inferred from
+ *  a `topic` field in the incoming data.
+ * @param producerParams Parameters for Kafka producers in each task.
+ * @param schema The schema of the input data.
+ */
+class KafkaContinuousWriter(
+topic: Option[String], producerParams: Map[String, String], schema: 
StructType)
+  extends ContinuousWriter with SupportsWriteInternalRow {
+
+  override def createInternalRowWriterFactory(): 
KafkaContinuousWriterFactory =
+KafkaContinuousWriterFactory(topic, producerParams, schema)
+
+  override def commit(epochId: Long, messages: 
Array[WriterCommitMessage]): Unit = {}
+  override def abort(messages: Array[WriterCommitMessage]): Unit = {}
+}
+
+/**
+ * A [[DataWriterFactory]] for Kafka writing. Will be serialized and sent 
to executors to generate
+ * the per-task data writers.
+ * @param topic The topic that should be written to. If None, topic will 
be inferred from
+ *  a `topic` field in the incoming data.
+ * @param producerParams Parameters for Kafka producers in each task.
+ * @param schema The schema of the input data.
+ */
+case class KafkaContinuousWriterFactory(
+topic: Option[String], producerParams: Map[String, String], schema: 
StructType)
+  extends DataWriterFactory[InternalRow] {
+
+  override def createDataWriter(partitionId: Int, attemptNumber: Int): 
DataWriter[InternalRow] = {
+new KafkaContinuousDataWriter(topic, producerParams, 
schema.toAttributes)
+  }
+}
+
+/**
+ * A [[DataWriter]] for Kafka writing. One data writer will be created in 
each partition to
+ * process incoming rows.
+ *
+ * @param targetTopic The topic that this data writer is targeting. If 
None, topic will be inferred
+ *from a `topic` field in the incoming data.
+ * @param producerParams Parameters to use for the Kafka producer.
+ * @param inputSchema The attributes in the input data.
+ */
+class KafkaContinuousDataWriter(
+targetTopic: Option[String], producerParams: Map[String, String], 
inputSchema: Seq[Attribute])
+  extends KafkaRowWriter(inputSchema, targetTopic) with 
DataWriter[InternalRow] {
+  import scala.collection.JavaConverters._
+
+  private lazy val producer = CachedKafkaProducer.getOrCreate(
+new java.util.HashMap[String, Object](producerParams.asJava))
+
+  def write(row: InternalRow): Unit = {
+checkForErrors()
+sendRow(row, producer)
+  }
+
+  def commit(): 

[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160005666
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala
 ---
@@ -74,8 +56,49 @@ private[kafka010] class KafkaWriteTask(
   producer = null
 }
   }
+}
+
+private[kafka010] class KafkaRowWriter(
--- End diff --

nit: make this a trait .. so that it cannot be directly used.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159985929
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -145,6 +149,19 @@ private[kafka010] class KafkaOffsetReader(
   }
 }
 
+partitionOffsets.foreach {
+  case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
+off != KafkaOffsetRangeLimit.EARLIEST =>
+if (fetched(tp) != off) {
+  reportDataLoss(
+s"startingOffsets for $tp was $off but consumer reset to 
${fetched(tp)}")
+}
+  case _ =>
+  // no real way to check that beginning or end is reasonable
--- End diff --

nit: wrong indent


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r160008897
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSuite.scala
 ---
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+
+import org.scalatest.time.SpanSugar._
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{ForeachWriter, Row}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import 
org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, 
StreamingExecutionRelation, StreamingQueryWrapper}
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+class KafkaContinuousSuite extends KafkaSourceTest with SharedSQLContext {
+  import testImplicits._
+
+  // We need more than the default local[2] to be able to schedule all 
partitions simultaneously.
+  override protected def createSparkSession = new TestSparkSession(
+new SparkContext(
+  "local[10]",
+  "continuous-stream-test-sql-context",
+  sparkConf.set("spark.sql.testkey", "true")))
+
+  test("basic") {
--- End diff --

You are not testing all the cases of starting offsets, and other options. 
`KafkaSourceSuite` tests them, and since this is a whole different code path 
that microbatch, all of these case should be tested explicitly. 

I think the KafkaSourceSuite can be refactored such that code can be 
reused. Its some amount of work but without it, we are not confident about 
testing the all the new code paths.
  
Also the same with sink, there are combinations of options that are not 
tested (default topic specified/not-specified, null value in topic column)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159984700
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala
 ---
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+/**
+ * A [[ContinuousReader]] for data from kafka.
+ *
+ * @param offsetReader  a reader used to get kafka offsets. Note that the 
actual data will be
+ *  read by per-task consumers generated later.
+ * @param kafkaParams   String params for per-task Kafka consumers.
+ * @param sourceOptions The 
[[org.apache.spark.sql.sources.v2.DataSourceV2Options]] params which
+ *  are not Kafka consumer params.
+ * @param metadataPath Path to a directory this reader can use for writing 
metadata.
+ * @param initialOffsets The Kafka offsets to start reading data at.
+ * @param failOnDataLoss Flag indicating whether reading should fail in 
data loss
+ *   scenarios, where some offsets after the specified 
initial ones can't be
+ *   properly read.
+ */
+class KafkaContinuousReader(
+offsetReader: KafkaOffsetReader,
+kafkaParams: java.util.Map[String, Object],
+sourceOptions: Map[String, String],
+metadataPath: String,
+initialOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends ContinuousReader with SupportsScanUnsafeRow with Logging {
+
+  private lazy val session = SparkSession.getActiveSession.get
+  private lazy val sc = session.sparkContext
+
+  // Initialized when creating read tasks. If this diverges from the 
partitions at the latest
+  // offsets, we need to reconfigure.
+  // Exposed outside this object only for unit tests.
+  private[sql] var knownPartitions: Set[TopicPartition] = _
+
+  override def readSchema: StructType = KafkaOffsetReader.kafkaSchema
+
+  private var offset: Offset = _
+  override def setOffset(start: java.util.Optional[Offset]): Unit = {
+offset = start.orElse {
+  val offsets = initialOffsets match {
+case EarliestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
+case LatestOffsetRangeLimit => 
KafkaSourceOffset(offsetReader.fetchLatestOffsets())
+case SpecificOffsetRangeLimit(p) => 
offsetReader.fetchSpecificOffsets(p, reportDataLoss)
+  }
+  logInfo(s"Initial offsets: $offsets")
+  offsets
+}
+  }
+
+  override def getStartOffset(): Offset = offset
+
+  override def deserializeOffset(json: String): Offset = {
+KafkaSourceOffset(JsonUtils.partitionOffsets(json))
+  }
+
+  override def createUnsafeRowReadTasks(): 
java.util.List[ReadTask[UnsafeRow]] = {
+import scala.collection.JavaConverters._
+
+val oldStartPartitionOffsets = 
KafkaSourceOffset.getPartitionOffsets(offset)
+
+val newPartitions =
+  
offsetReader.fetchLatestOffsets().keySet.diff(oldStartPartitionOffsets.keySet)
+val newPartitionOffsets = 

[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-05 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159983463
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -261,6 +261,10 @@ private[kafka010] case class CachedKafkaConsumer 
private(
 }
   }
 
+  def wakeup(): Unit = {
--- End diff --

This does not seem to be used anywhere?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159797164
  
--- Diff: 
sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala
 ---
@@ -418,11 +418,16 @@ abstract class StreamExecution(
* Blocks the current thread until processing for data from the given 
`source` has reached at
* least the given `Offset`. This method is intended for use primarily 
when writing tests.
*/
-  private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = {
+  private[sql] def awaitOffset(sourceIndex: Int, newOffset: Offset): Unit 
= {
 assertAwaitThread()
 def notDone = {
   val localCommittedOffsets = committedOffsets
-  !localCommittedOffsets.contains(source) || 
localCommittedOffsets(source) != newOffset
+  if (sources.length <= sourceIndex) {
+false
--- End diff --

Sources is a var which might not be populated yet. (This race condition 
showed up in AddKafkaData in my tests.)


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159796525
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSinkV2.scala
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.clients.producer.{Callback, ProducerRecord, 
RecordMetadata}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection}
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{kafkaParamsForProducer, 
TOPIC_OPTION_KEY}
+import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.{BinaryType, StringType, StructType}
+
+class ContinuousKafkaWriter(
+topic: Option[String], producerParams: Map[String, String], schema: 
StructType)
+  extends ContinuousWriter with SupportsWriteInternalRow {
+
+  override def createInternalRowWriterFactory(): KafkaWriterFactory =
+KafkaWriterFactory(topic, producerParams, schema)
+
+  override def commit(epochId: Long, messages: 
Array[WriterCommitMessage]): Unit = {}
+  override def abort(messages: Array[WriterCommitMessage]): Unit = {}
+}
+
+case class KafkaWriterFactory(
+topic: Option[String], producerParams: Map[String, String], schema: 
StructType)
+  extends DataWriterFactory[InternalRow] {
+
+  override def createDataWriter(partitionId: Int, attemptNumber: Int): 
DataWriter[InternalRow] = {
+new KafkaDataWriter(topic, producerParams, schema.toAttributes)
+  }
+}
+
+case class KafkaWriterCommitMessage() extends WriterCommitMessage {}
+
+class KafkaDataWriter(
+topic: Option[String], producerParams: Map[String, String], 
inputSchema: Seq[Attribute])
+  extends DataWriter[InternalRow] {
+  import scala.collection.JavaConverters._
+
+  @volatile private var failedWrite: Exception = _
+  private val projection = createProjection
+  private lazy val producer = CachedKafkaProducer.getOrCreate(
+new java.util.HashMap[String, Object](producerParams.asJava))
+
+  private val callback = new Callback() {
+override def onCompletion(recordMetadata: RecordMetadata, e: 
Exception): Unit = {
+  if (failedWrite == null && e != null) {
+failedWrite = e
+  }
+}
+  }
+
+  def write(row: InternalRow): Unit = {
+if (failedWrite != null) return
+
+val projectedRow = projection(row)
+val topic = projectedRow.getUTF8String(0)
+val key = projectedRow.getBinary(1)
+val value = projectedRow.getBinary(2)
+
+if (topic == null) {
+  throw new NullPointerException(s"null topic present in the data. Use 
the " +
+s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a 
default topic.")
+}
+val record = new ProducerRecord[Array[Byte], 
Array[Byte]](topic.toString, key, value)
+producer.send(record, callback)
+  }
+
+  def commit(): WriterCommitMessage = KafkaWriterCommitMessage()
+  def abort(): Unit = {}
+
+  def close(): Unit = {
+checkForErrors()
+if (producer != null) {
+  producer.flush()
+  checkForErrors()
+}
--- End diff --

I think CachedKafkaProducer handles closing automatically, but since these 
are long lived I can do it explicitly too.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159796099
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.WakeupException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class ContinuousKafkaReader(
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: java.util.Map[String, Object],
+sourceOptions: Map[String, String],
+metadataPath: String,
+initialOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends ContinuousReader with SupportsScanUnsafeRow with Logging {
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+val mergedMap = offsets.map {
+  case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
+}.reduce(_ ++ _)
+KafkaSourceOffset(mergedMap)
+  }
+
+  private lazy val session = SparkSession.getActiveSession.get
+  private lazy val sc = session.sparkContext
+
+  private lazy val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+  ).toLong
+
+  private val maxOffsetsPerTrigger =
+sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = {
+  val offsets = initialOffsets match {
+case EarliestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
+case LatestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
+case SpecificOffsetRangeLimit(p) => fetchAndVerify(p)
+  }
+  logInfo(s"Initial offsets: $offsets")
+  offsets.partitionToOffsets
+  }
+
+  private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = 
{
+val result = kafkaReader.fetchSpecificOffsets(specificOffsets)
+specificOffsets.foreach {
+  case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
+off != KafkaOffsetRangeLimit.EARLIEST =>
+if (result(tp) != off) {
+  reportDataLoss(
+s"startingOffsets for $tp was $off but consumer reset to 
${result(tp)}")
+}
+  case _ =>
+  // no real way to check that beginning or end is reasonable
+}
+KafkaSourceOffset(result)
+  }
+
+  // Initialized when creating read tasks. If this diverges from the 
partitions 

[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159795887
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.WakeupException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class ContinuousKafkaReader(
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: java.util.Map[String, Object],
+sourceOptions: Map[String, String],
+metadataPath: String,
+initialOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends ContinuousReader with SupportsScanUnsafeRow with Logging {
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+val mergedMap = offsets.map {
+  case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
+}.reduce(_ ++ _)
+KafkaSourceOffset(mergedMap)
+  }
+
+  private lazy val session = SparkSession.getActiveSession.get
+  private lazy val sc = session.sparkContext
+
+  private lazy val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+  ).toLong
+
+  private val maxOffsetsPerTrigger =
+sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = {
+  val offsets = initialOffsets match {
+case EarliestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
+case LatestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
+case SpecificOffsetRangeLimit(p) => fetchAndVerify(p)
+  }
+  logInfo(s"Initial offsets: $offsets")
+  offsets.partitionToOffsets
+  }
+
+  private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = 
{
+val result = kafkaReader.fetchSpecificOffsets(specificOffsets)
+specificOffsets.foreach {
+  case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
+off != KafkaOffsetRangeLimit.EARLIEST =>
+if (result(tp) != off) {
+  reportDataLoss(
+s"startingOffsets for $tp was $off but consumer reset to 
${result(tp)}")
+}
+  case _ =>
+  // no real way to check that beginning or end is reasonable
+}
+KafkaSourceOffset(result)
+  }
+
+  // Initialized when creating read tasks. If this diverges from the 
partitions 

[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159795747
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.WakeupException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class ContinuousKafkaReader(
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: java.util.Map[String, Object],
+sourceOptions: Map[String, String],
+metadataPath: String,
+initialOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends ContinuousReader with SupportsScanUnsafeRow with Logging {
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+val mergedMap = offsets.map {
+  case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
+}.reduce(_ ++ _)
+KafkaSourceOffset(mergedMap)
+  }
+
+  private lazy val session = SparkSession.getActiveSession.get
+  private lazy val sc = session.sparkContext
+
+  private lazy val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+  ).toLong
+
+  private val maxOffsetsPerTrigger =
+sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = {
+  val offsets = initialOffsets match {
+case EarliestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
+case LatestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
+case SpecificOffsetRangeLimit(p) => fetchAndVerify(p)
+  }
+  logInfo(s"Initial offsets: $offsets")
+  offsets.partitionToOffsets
+  }
+
+  private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = 
{
+val result = kafkaReader.fetchSpecificOffsets(specificOffsets)
+specificOffsets.foreach {
+  case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
+off != KafkaOffsetRangeLimit.EARLIEST =>
+if (result(tp) != off) {
+  reportDataLoss(
+s"startingOffsets for $tp was $off but consumer reset to 
${result(tp)}")
+}
+  case _ =>
+  // no real way to check that beginning or end is reasonable
+}
+KafkaSourceOffset(result)
+  }
+
+  // Initialized when creating read tasks. If this diverges from the 
partitions 

[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread jose-torres
Github user jose-torres commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159795735
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.WakeupException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class ContinuousKafkaReader(
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: java.util.Map[String, Object],
+sourceOptions: Map[String, String],
+metadataPath: String,
+initialOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends ContinuousReader with SupportsScanUnsafeRow with Logging {
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+val mergedMap = offsets.map {
+  case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
+}.reduce(_ ++ _)
+KafkaSourceOffset(mergedMap)
+  }
+
+  private lazy val session = SparkSession.getActiveSession.get
+  private lazy val sc = session.sparkContext
+
+  private lazy val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+  ).toLong
+
+  private val maxOffsetsPerTrigger =
+sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = {
--- End diff --

oops, no


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159569390
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.WakeupException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class ContinuousKafkaReader(
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: java.util.Map[String, Object],
+sourceOptions: Map[String, String],
+metadataPath: String,
+initialOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends ContinuousReader with SupportsScanUnsafeRow with Logging {
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+val mergedMap = offsets.map {
+  case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
+}.reduce(_ ++ _)
+KafkaSourceOffset(mergedMap)
+  }
+
+  private lazy val session = SparkSession.getActiveSession.get
+  private lazy val sc = session.sparkContext
+
+  private lazy val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+  ).toLong
+
+  private val maxOffsetsPerTrigger =
+sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = {
+  val offsets = initialOffsets match {
+case EarliestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
+case LatestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
+case SpecificOffsetRangeLimit(p) => fetchAndVerify(p)
+  }
+  logInfo(s"Initial offsets: $offsets")
+  offsets.partitionToOffsets
+  }
+
+  private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = 
{
+val result = kafkaReader.fetchSpecificOffsets(specificOffsets)
+specificOffsets.foreach {
+  case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
+off != KafkaOffsetRangeLimit.EARLIEST =>
+if (result(tp) != off) {
+  reportDataLoss(
+s"startingOffsets for $tp was $off but consumer reset to 
${result(tp)}")
+}
+  case _ =>
+  // no real way to check that beginning or end is reasonable
+}
+KafkaSourceOffset(result)
+  }
+
+  // Initialized when creating read tasks. If this diverges from the 
partitions at the 

[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159569305
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.WakeupException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class ContinuousKafkaReader(
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: java.util.Map[String, Object],
+sourceOptions: Map[String, String],
+metadataPath: String,
+initialOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends ContinuousReader with SupportsScanUnsafeRow with Logging {
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+val mergedMap = offsets.map {
+  case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
+}.reduce(_ ++ _)
+KafkaSourceOffset(mergedMap)
+  }
+
+  private lazy val session = SparkSession.getActiveSession.get
+  private lazy val sc = session.sparkContext
+
+  private lazy val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+  ).toLong
+
+  private val maxOffsetsPerTrigger =
+sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = {
+  val offsets = initialOffsets match {
+case EarliestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
+case LatestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
+case SpecificOffsetRangeLimit(p) => fetchAndVerify(p)
+  }
+  logInfo(s"Initial offsets: $offsets")
+  offsets.partitionToOffsets
+  }
+
+  private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = 
{
+val result = kafkaReader.fetchSpecificOffsets(specificOffsets)
+specificOffsets.foreach {
+  case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
+off != KafkaOffsetRangeLimit.EARLIEST =>
+if (result(tp) != off) {
+  reportDataLoss(
+s"startingOffsets for $tp was $off but consumer reset to 
${result(tp)}")
+}
+  case _ =>
+  // no real way to check that beginning or end is reasonable
+}
+KafkaSourceOffset(result)
+  }
+
+  // Initialized when creating read tasks. If this diverges from the 
partitions at the 

[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159569599
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.WakeupException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class ContinuousKafkaReader(
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: java.util.Map[String, Object],
+sourceOptions: Map[String, String],
+metadataPath: String,
+initialOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends ContinuousReader with SupportsScanUnsafeRow with Logging {
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+val mergedMap = offsets.map {
+  case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
+}.reduce(_ ++ _)
+KafkaSourceOffset(mergedMap)
+  }
+
+  private lazy val session = SparkSession.getActiveSession.get
+  private lazy val sc = session.sparkContext
+
+  private lazy val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+  ).toLong
+
+  private val maxOffsetsPerTrigger =
+sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = {
+  val offsets = initialOffsets match {
+case EarliestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
+case LatestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
+case SpecificOffsetRangeLimit(p) => fetchAndVerify(p)
+  }
+  logInfo(s"Initial offsets: $offsets")
+  offsets.partitionToOffsets
+  }
+
+  private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = 
{
+val result = kafkaReader.fetchSpecificOffsets(specificOffsets)
+specificOffsets.foreach {
+  case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
+off != KafkaOffsetRangeLimit.EARLIEST =>
+if (result(tp) != off) {
+  reportDataLoss(
+s"startingOffsets for $tp was $off but consumer reset to 
${result(tp)}")
+}
+  case _ =>
+  // no real way to check that beginning or end is reasonable
+}
+KafkaSourceOffset(result)
+  }
+
+  // Initialized when creating read tasks. If this diverges from the 
partitions at the 

[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159568911
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.WakeupException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class ContinuousKafkaReader(
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: java.util.Map[String, Object],
+sourceOptions: Map[String, String],
+metadataPath: String,
+initialOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends ContinuousReader with SupportsScanUnsafeRow with Logging {
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+val mergedMap = offsets.map {
+  case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
+}.reduce(_ ++ _)
+KafkaSourceOffset(mergedMap)
+  }
+
+  private lazy val session = SparkSession.getActiveSession.get
+  private lazy val sc = session.sparkContext
+
+  private lazy val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+  ).toLong
+
+  private val maxOffsetsPerTrigger =
+sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = {
+  val offsets = initialOffsets match {
+case EarliestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
+case LatestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
+case SpecificOffsetRangeLimit(p) => fetchAndVerify(p)
+  }
+  logInfo(s"Initial offsets: $offsets")
+  offsets.partitionToOffsets
+  }
+
+  private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = 
{
+val result = kafkaReader.fetchSpecificOffsets(specificOffsets)
+specificOffsets.foreach {
+  case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
+off != KafkaOffsetRangeLimit.EARLIEST =>
+if (result(tp) != off) {
+  reportDataLoss(
+s"startingOffsets for $tp was $off but consumer reset to 
${result(tp)}")
+}
+  case _ =>
+  // no real way to check that beginning or end is reasonable
+}
+KafkaSourceOffset(result)
+  }
+
+  // Initialized when creating read tasks. If this diverges from the 
partitions at the 

[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159728249
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSinkV2.scala
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.clients.producer.{Callback, ProducerRecord, 
RecordMetadata}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection}
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{kafkaParamsForProducer, 
TOPIC_OPTION_KEY}
+import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.{BinaryType, StringType, StructType}
+
+class ContinuousKafkaWriter(
+topic: Option[String], producerParams: Map[String, String], schema: 
StructType)
+  extends ContinuousWriter with SupportsWriteInternalRow {
+
+  override def createInternalRowWriterFactory(): KafkaWriterFactory =
+KafkaWriterFactory(topic, producerParams, schema)
+
+  override def commit(epochId: Long, messages: 
Array[WriterCommitMessage]): Unit = {}
+  override def abort(messages: Array[WriterCommitMessage]): Unit = {}
+}
+
+case class KafkaWriterFactory(
+topic: Option[String], producerParams: Map[String, String], schema: 
StructType)
+  extends DataWriterFactory[InternalRow] {
+
+  override def createDataWriter(partitionId: Int, attemptNumber: Int): 
DataWriter[InternalRow] = {
+new KafkaDataWriter(topic, producerParams, schema.toAttributes)
+  }
+}
+
+case class KafkaWriterCommitMessage() extends WriterCommitMessage {}
--- End diff --

If there is no param, then this can be a `case object`. And I dont think 
`{}` is needed.  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159568750
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.WakeupException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class ContinuousKafkaReader(
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: java.util.Map[String, Object],
+sourceOptions: Map[String, String],
+metadataPath: String,
+initialOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends ContinuousReader with SupportsScanUnsafeRow with Logging {
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+val mergedMap = offsets.map {
+  case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
+}.reduce(_ ++ _)
+KafkaSourceOffset(mergedMap)
+  }
+
+  private lazy val session = SparkSession.getActiveSession.get
+  private lazy val sc = session.sparkContext
+
+  private lazy val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+  ).toLong
+
+  private val maxOffsetsPerTrigger =
+sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = {
+  val offsets = initialOffsets match {
+case EarliestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
+case LatestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
+case SpecificOffsetRangeLimit(p) => fetchAndVerify(p)
+  }
+  logInfo(s"Initial offsets: $offsets")
+  offsets.partitionToOffsets
+  }
+
+  private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = 
{
+val result = kafkaReader.fetchSpecificOffsets(specificOffsets)
+specificOffsets.foreach {
+  case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
+off != KafkaOffsetRangeLimit.EARLIEST =>
+if (result(tp) != off) {
+  reportDataLoss(
+s"startingOffsets for $tp was $off but consumer reset to 
${result(tp)}")
+}
+  case _ =>
+  // no real way to check that beginning or end is reasonable
+}
+KafkaSourceOffset(result)
+  }
+
+  // Initialized when creating read tasks. If this diverges from the 
partitions at the 

[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159568798
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.WakeupException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class ContinuousKafkaReader(
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: java.util.Map[String, Object],
+sourceOptions: Map[String, String],
+metadataPath: String,
+initialOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends ContinuousReader with SupportsScanUnsafeRow with Logging {
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+val mergedMap = offsets.map {
+  case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
+}.reduce(_ ++ _)
+KafkaSourceOffset(mergedMap)
+  }
+
+  private lazy val session = SparkSession.getActiveSession.get
+  private lazy val sc = session.sparkContext
+
+  private lazy val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+  ).toLong
+
+  private val maxOffsetsPerTrigger =
+sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = {
+  val offsets = initialOffsets match {
+case EarliestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
+case LatestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
+case SpecificOffsetRangeLimit(p) => fetchAndVerify(p)
+  }
+  logInfo(s"Initial offsets: $offsets")
+  offsets.partitionToOffsets
+  }
+
+  private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = 
{
+val result = kafkaReader.fetchSpecificOffsets(specificOffsets)
+specificOffsets.foreach {
+  case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
+off != KafkaOffsetRangeLimit.EARLIEST =>
+if (result(tp) != off) {
+  reportDataLoss(
+s"startingOffsets for $tp was $off but consumer reset to 
${result(tp)}")
+}
+  case _ =>
+  // no real way to check that beginning or end is reasonable
+}
+KafkaSourceOffset(result)
+  }
+
+  // Initialized when creating read tasks. If this diverges from the 
partitions at the 

[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159569925
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.WakeupException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class ContinuousKafkaReader(
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: java.util.Map[String, Object],
+sourceOptions: Map[String, String],
+metadataPath: String,
+initialOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends ContinuousReader with SupportsScanUnsafeRow with Logging {
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+val mergedMap = offsets.map {
+  case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
+}.reduce(_ ++ _)
+KafkaSourceOffset(mergedMap)
+  }
+
+  private lazy val session = SparkSession.getActiveSession.get
+  private lazy val sc = session.sparkContext
+
+  private lazy val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+  ).toLong
+
+  private val maxOffsetsPerTrigger =
+sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = {
+  val offsets = initialOffsets match {
+case EarliestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
+case LatestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
+case SpecificOffsetRangeLimit(p) => fetchAndVerify(p)
+  }
+  logInfo(s"Initial offsets: $offsets")
+  offsets.partitionToOffsets
+  }
+
+  private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = 
{
+val result = kafkaReader.fetchSpecificOffsets(specificOffsets)
+specificOffsets.foreach {
+  case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
+off != KafkaOffsetRangeLimit.EARLIEST =>
+if (result(tp) != off) {
+  reportDataLoss(
+s"startingOffsets for $tp was $off but consumer reset to 
${result(tp)}")
+}
+  case _ =>
+  // no real way to check that beginning or end is reasonable
+}
+KafkaSourceOffset(result)
+  }
+
+  // Initialized when creating read tasks. If this diverges from the 
partitions at the 

[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159571068
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSinkV2.scala
 ---
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.kafka.clients.producer.{Callback, ProducerRecord, 
RecordMetadata}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection}
+import 
org.apache.spark.sql.kafka010.KafkaSourceProvider.{kafkaParamsForProducer, 
TOPIC_OPTION_KEY}
+import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter
+import org.apache.spark.sql.sources.v2.writer._
+import org.apache.spark.sql.streaming.OutputMode
+import org.apache.spark.sql.types.{BinaryType, StringType, StructType}
+
+class ContinuousKafkaWriter(
+topic: Option[String], producerParams: Map[String, String], schema: 
StructType)
+  extends ContinuousWriter with SupportsWriteInternalRow {
+
+  override def createInternalRowWriterFactory(): KafkaWriterFactory =
+KafkaWriterFactory(topic, producerParams, schema)
+
+  override def commit(epochId: Long, messages: 
Array[WriterCommitMessage]): Unit = {}
+  override def abort(messages: Array[WriterCommitMessage]): Unit = {}
+}
+
+case class KafkaWriterFactory(
--- End diff --

I know that there are docs in the DataSourceV2  API classes, but can you 
add brief docs to these classes to make it easier to understand how these class 
relate to each other and how they are used? Otherwise jumping back and forth to 
the superclases to read the docs is tricky.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159566309
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.WakeupException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class ContinuousKafkaReader(
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: java.util.Map[String, Object],
+sourceOptions: Map[String, String],
+metadataPath: String,
+initialOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends ContinuousReader with SupportsScanUnsafeRow with Logging {
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+val mergedMap = offsets.map {
+  case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
+}.reduce(_ ++ _)
+KafkaSourceOffset(mergedMap)
+  }
+
+  private lazy val session = SparkSession.getActiveSession.get
+  private lazy val sc = session.sparkContext
+
+  private lazy val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+  ).toLong
+
+  private val maxOffsetsPerTrigger =
+sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = {
+  val offsets = initialOffsets match {
+case EarliestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
+case LatestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
+case SpecificOffsetRangeLimit(p) => fetchAndVerify(p)
+  }
+  logInfo(s"Initial offsets: $offsets")
+  offsets.partitionToOffsets
+  }
+
+  private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = 
{
--- End diff --

This functions seems to be duplicate of that in the KafkaSource. Can you 
dedup? Maybe move this into the KafkaOffsetReader?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159570879
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.WakeupException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class ContinuousKafkaReader(
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: java.util.Map[String, Object],
+sourceOptions: Map[String, String],
+metadataPath: String,
+initialOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends ContinuousReader with SupportsScanUnsafeRow with Logging {
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+val mergedMap = offsets.map {
+  case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
+}.reduce(_ ++ _)
+KafkaSourceOffset(mergedMap)
+  }
+
+  private lazy val session = SparkSession.getActiveSession.get
+  private lazy val sc = session.sparkContext
+
+  private lazy val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+  ).toLong
+
+  private val maxOffsetsPerTrigger =
+sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = {
+  val offsets = initialOffsets match {
+case EarliestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
+case LatestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
+case SpecificOffsetRangeLimit(p) => fetchAndVerify(p)
+  }
+  logInfo(s"Initial offsets: $offsets")
+  offsets.partitionToOffsets
+  }
+
+  private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = 
{
+val result = kafkaReader.fetchSpecificOffsets(specificOffsets)
+specificOffsets.foreach {
+  case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
+off != KafkaOffsetRangeLimit.EARLIEST =>
+if (result(tp) != off) {
+  reportDataLoss(
+s"startingOffsets for $tp was $off but consumer reset to 
${result(tp)}")
+}
+  case _ =>
+  // no real way to check that beginning or end is reasonable
+}
+KafkaSourceOffset(result)
+  }
+
+  // Initialized when creating read tasks. If this diverges from the 
partitions at the 

[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159568788
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.WakeupException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class ContinuousKafkaReader(
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: java.util.Map[String, Object],
+sourceOptions: Map[String, String],
+metadataPath: String,
+initialOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends ContinuousReader with SupportsScanUnsafeRow with Logging {
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+val mergedMap = offsets.map {
+  case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
+}.reduce(_ ++ _)
+KafkaSourceOffset(mergedMap)
+  }
+
+  private lazy val session = SparkSession.getActiveSession.get
+  private lazy val sc = session.sparkContext
+
+  private lazy val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+  ).toLong
+
+  private val maxOffsetsPerTrigger =
+sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = {
+  val offsets = initialOffsets match {
+case EarliestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
+case LatestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
+case SpecificOffsetRangeLimit(p) => fetchAndVerify(p)
+  }
+  logInfo(s"Initial offsets: $offsets")
+  offsets.partitionToOffsets
+  }
+
+  private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = 
{
+val result = kafkaReader.fetchSpecificOffsets(specificOffsets)
+specificOffsets.foreach {
+  case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
+off != KafkaOffsetRangeLimit.EARLIEST =>
+if (result(tp) != off) {
+  reportDataLoss(
+s"startingOffsets for $tp was $off but consumer reset to 
${result(tp)}")
+}
+  case _ =>
+  // no real way to check that beginning or end is reasonable
+}
+KafkaSourceOffset(result)
+  }
+
+  // Initialized when creating read tasks. If this diverges from the 
partitions at the 

[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159568756
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.WakeupException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class ContinuousKafkaReader(
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: java.util.Map[String, Object],
+sourceOptions: Map[String, String],
+metadataPath: String,
+initialOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends ContinuousReader with SupportsScanUnsafeRow with Logging {
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+val mergedMap = offsets.map {
+  case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
+}.reduce(_ ++ _)
+KafkaSourceOffset(mergedMap)
+  }
+
+  private lazy val session = SparkSession.getActiveSession.get
+  private lazy val sc = session.sparkContext
+
+  private lazy val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+  ).toLong
+
+  private val maxOffsetsPerTrigger =
+sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = {
+  val offsets = initialOffsets match {
+case EarliestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
+case LatestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
+case SpecificOffsetRangeLimit(p) => fetchAndVerify(p)
+  }
+  logInfo(s"Initial offsets: $offsets")
+  offsets.partitionToOffsets
+  }
+
+  private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = 
{
+val result = kafkaReader.fetchSpecificOffsets(specificOffsets)
+specificOffsets.foreach {
+  case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
+off != KafkaOffsetRangeLimit.EARLIEST =>
+if (result(tp) != off) {
+  reportDataLoss(
+s"startingOffsets for $tp was $off but consumer reset to 
${result(tp)}")
+}
+  case _ =>
+  // no real way to check that beginning or end is reasonable
+}
+KafkaSourceOffset(result)
+  }
+
+  // Initialized when creating read tasks. If this diverges from the 
partitions at the 

[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159503021
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.WakeupException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class ContinuousKafkaReader(
--- End diff --

Add docs explaining params.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159571114
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.WakeupException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class ContinuousKafkaReader(
--- End diff --

Can we rename this and other classes such that all Kafka class start with 
"Kafka"?


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159566461
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.WakeupException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class ContinuousKafkaReader(
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: java.util.Map[String, Object],
+sourceOptions: Map[String, String],
+metadataPath: String,
+initialOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends ContinuousReader with SupportsScanUnsafeRow with Logging {
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+val mergedMap = offsets.map {
+  case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
+}.reduce(_ ++ _)
+KafkaSourceOffset(mergedMap)
+  }
+
+  private lazy val session = SparkSession.getActiveSession.get
+  private lazy val sc = session.sparkContext
+
+  private lazy val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+  ).toLong
+
+  private val maxOffsetsPerTrigger =
+sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = {
--- End diff --

is this used anywhere??


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159569751
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.WakeupException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class ContinuousKafkaReader(
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: java.util.Map[String, Object],
+sourceOptions: Map[String, String],
+metadataPath: String,
+initialOffsets: KafkaOffsetRangeLimit,
+failOnDataLoss: Boolean)
+  extends ContinuousReader with SupportsScanUnsafeRow with Logging {
+
+  override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = {
+val mergedMap = offsets.map {
+  case KafkaSourcePartitionOffset(p, o) => Map(p -> o)
+}.reduce(_ ++ _)
+KafkaSourceOffset(mergedMap)
+  }
+
+  private lazy val session = SparkSession.getActiveSession.get
+  private lazy val sc = session.sparkContext
+
+  private lazy val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString
+  ).toLong
+
+  private val maxOffsetsPerTrigger =
+sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong)
+
+  /**
+   * Lazily initialize `initialPartitionOffsets` to make sure that 
`KafkaConsumer.poll` is only
+   * called in StreamExecutionThread. Otherwise, interrupting a thread 
while running
+   * `KafkaConsumer.poll` may hang forever (KAFKA-1894).
+   */
+  private lazy val initialPartitionOffsets = {
+  val offsets = initialOffsets match {
+case EarliestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchEarliestOffsets())
+case LatestOffsetRangeLimit => 
KafkaSourceOffset(kafkaReader.fetchLatestOffsets())
+case SpecificOffsetRangeLimit(p) => fetchAndVerify(p)
+  }
+  logInfo(s"Initial offsets: $offsets")
+  offsets.partitionToOffsets
+  }
+
+  private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = 
{
+val result = kafkaReader.fetchSpecificOffsets(specificOffsets)
+specificOffsets.foreach {
+  case (tp, off) if off != KafkaOffsetRangeLimit.LATEST &&
+off != KafkaOffsetRangeLimit.EARLIEST =>
+if (result(tp) != off) {
+  reportDataLoss(
+s"startingOffsets for $tp was $off but consumer reset to 
${result(tp)}")
+}
+  case _ =>
+  // no real way to check that beginning or end is reasonable
+}
+KafkaSourceOffset(result)
+  }
+
+  // Initialized when creating read tasks. If this diverges from the 
partitions at the 

[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159731324
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -181,26 +223,22 @@ private[kafka010] class KafkaSourceProvider extends 
DataSourceRegister
 }
   }
 
-  private def kafkaParamsForProducer(parameters: Map[String, String]): 
Map[String, String] = {
-val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase(Locale.ROOT), v) }
-if 
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}"))
 {
-  throw new IllegalArgumentException(
-s"Kafka option '${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}' is 
not supported as keys "
-  + "are serialized with ByteArraySerializer.")
-}
+  override def createContinuousWriter(
+  queryId: String,
+  schema: StructType,
+  mode: OutputMode,
+  options: DataSourceV2Options): java.util.Optional[ContinuousWriter] 
= {
+import scala.collection.JavaConverters._
 
-if 
(caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}"))
-{
-  throw new IllegalArgumentException(
-s"Kafka option '${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}' 
is not supported as "
-  + "value are serialized with ByteArraySerializer.")
-}
-parameters
-  .keySet
-  .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka."))
-  .map { k => k.drop(6).toString -> parameters(k) }
-  .toMap + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> 
classOf[ByteArraySerializer].getName,
-ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> 
classOf[ByteArraySerializer].getName)
+val spark = SparkSession.getActiveSession.get
+val topic = 
Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim)
+// We convert the options argument from V2 -> Java map -> scala 
mutable -> scala immutable.
+val producerParams = 
kafkaParamsForProducer(options.asMap.asScala.toMap)
+
+KafkaWriter.validateQuery(
+  schema.toAttributes, new java.util.HashMap[String, 
Object](producerParams.asJava), topic)
+
+java.util.Optional.of(new ContinuousKafkaWriter(topic, producerParams, 
schema))
--- End diff --

import java.util.Optional. Its being used in multiple places in this file.


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159733170
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/ContinuousKafkaSuite.scala
 ---
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.Properties
+
+import org.scalatest.time.SpanSugar._
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.spark.SparkContext
+import org.apache.spark.sql.{ForeachWriter, Row}
+import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation
+import 
org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, 
StreamingExecutionRelation, StreamingQueryWrapper}
+import 
org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution
+import org.apache.spark.sql.streaming.Trigger
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+class ContinuousKafkaSuite extends KafkaSourceTest with SharedSQLContext {
--- End diff --

Rename to KafkaContinuousSuite to make it easier to find.
  


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159504846
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala
 ---
@@ -31,10 +32,14 @@ case class KafkaSourceOffset(partitionToOffsets: 
Map[TopicPartition, Long]) exte
   override val json = JsonUtils.partitionOffsets(partitionToOffsets)
 }
 
+private[kafka010]
+case class KafkaSourcePartitionOffset(topicPartition: TopicPartition, 
partitionOffset: Long)
+  extends PartitionOffset
+
 /** Companion object of the [[KafkaSourceOffset]] */
 private[kafka010] object KafkaSourceOffset {
 
-  def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long] = {
+  def getPartitionOffsets(offset: LegacyOffset): Map[TopicPartition, Long] 
= {
--- End diff --

nit: can we use OffsetV1 or something like that to make the difference more 
obvious 


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2018-01-04 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/20096#discussion_r159563923
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala ---
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.io._
+import java.nio.charset.StandardCharsets
+import java.util.concurrent.atomic.AtomicBoolean
+
+import org.apache.commons.io.IOUtils
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.WakeupException
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, 
Literal, UnsafeProjection, UnsafeRow}
+import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, 
UnsafeRowWriter}
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, 
SerializedOffset}
+import 
org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE,
 INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION}
+import org.apache.spark.sql.sources.v2.reader._
+import 
org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, 
ContinuousReader, Offset, PartitionOffset}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+class ContinuousKafkaReader(
+kafkaReader: KafkaOffsetReader,
--- End diff --

Please rename this to offsetReader or maybe offsetFetcher to distinguish 
this from all the Reader classes in DataSourceV2


---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...

2017-12-27 Thread jose-torres
GitHub user jose-torres opened a pull request:

https://github.com/apache/spark/pull/20096

[SPARK-22908] Add kafka source and sink for continuous processing.

## What changes were proposed in this pull request?

Add kafka source and sink for continuous processing. This involves two 
small changes to the execution engine:

* Bring data reader close() into the normal data reader thread to avoid 
thread safety issues.
* Fix up the semantics of the RECONFIGURING StreamExecution state. State 
updates are now atomic, and we don't have to deal with swallowing an exception.

## How was this patch tested?

new unit tests


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/jose-torres/spark continuous-kafka

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/20096.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #20096


commit eec38d374a4f3db46a26b8926192ae44da90b6ae
Author: Jose Torres 
Date:   2017-12-21T21:07:35Z

basic kafka

commit f91cb0190d5a5d7942cd3d53bc571140a03965c6
Author: Jose Torres 
Date:   2017-12-24T21:40:20Z

move reader close to data reader thread in case reader isn't thread safe

commit 7c180db439c9d3c6389c5ff6033a61341e7f1bbf
Author: Jose Torres 
Date:   2017-12-27T20:43:16Z

test + small fixes

commit 7596e34f1e8a047263da7bf8522a14869f289125
Author: Jose Torres 
Date:   2017-12-27T21:21:56Z

fixes lost in cherrypick




---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org