[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-07 Thread asfgit
Github user asfgit closed the pull request at:

https://github.com/apache/spark/pull/16686


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-07 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99908711
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -327,6 +332,39 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 }
   }
 
+  def releaseKafkaConsumer(
+  topic: String,
+  partition: Int,
+  kafkaParams: ju.Map[String, Object]): Unit = {
+val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+val topicPartition = new TopicPartition(topic, partition)
+val key = CacheKey(groupId, topicPartition)
+
+val consumer = cache.get(key)
+if (consumer != null) {
+  consumer.inuse = false
+} else {
+  logWarning(s"Attempting to release consumer that does not exist")
+}
+  }
+
+  /**
+   * Removes (and closes) the Kafka Consumer for the given topic, 
partition and group id.
+   */
+  def removeKafkaConsumer(
+  topic: String,
+  partition: Int,
+  kafkaParams: ju.Map[String, Object]): Unit = {
+val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+val topicPartition = new TopicPartition(topic, partition)
+val key = CacheKey(groupId, topicPartition)
+
+val removedConsumer = cache.remove(key)
--- End diff --

nit: please add `synchronized`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-07 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99909068
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -327,6 +332,39 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
 }
   }
 
+  def releaseKafkaConsumer(
+  topic: String,
+  partition: Int,
+  kafkaParams: ju.Map[String, Object]): Unit = {
+val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
+val topicPartition = new TopicPartition(topic, partition)
+val key = CacheKey(groupId, topicPartition)
+
+val consumer = cache.get(key)
--- End diff --

nit: please add `synchronized`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-03 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99382639
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaTopicPartitionOffsetReader.scala
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
+
+/**
+ * This class uses Kafka's own [[KafkaConsumer]] API to read data offsets 
from Kafka.
+ * The [[ConsumerStrategy]] class defines which Kafka topics and 
partitions should be read
+ * by this source. These strategies directly correspond to the different 
consumption options
+ * in. This class is designed to return a configured [[KafkaConsumer]] 
that is used by the
+ * [[KafkaSource]] to query for the offsets. See the docs on
+ * [[org.apache.spark.sql.kafka010.ConsumerStrategy]]
+ * for more details.
+ *
+ * Note: This class is not ThreadSafe
+ */
+private[kafka010] class KafkaTopicPartitionOffsetReader(
--- End diff --

I'll rollback. My thought was to also indicate that it's being used to read 
TopicPartition(s).


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-03 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99381274
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsets.scala
 ---
@@ -19,14 +19,31 @@ package org.apache.spark.sql.kafka010
 
 import org.apache.kafka.common.TopicPartition
 
-/*
- * Values that can be specified for config startingOffsets
+/**
+ * Values that can be specified to configure starting,
+ * ending, and specific offsets.
  */
-private[kafka010] sealed trait StartingOffsets
+private[kafka010] sealed trait KafkaOffsets
--- End diff --

I went with KafkaOffsetRangeLimit


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99328721
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+
+private[kafka010] class KafkaRelation(
+   override val sqlContext: SQLContext,
--- End diff --

incorrect indents


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99333060
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.kafka.common.TopicPartition
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.SharedSQLContext
+
+class KafkaRelationSuite extends QueryTest with BeforeAndAfter with 
SharedSQLContext {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  private var testUtils: KafkaTestUtils = _
+
+  private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
+
+  private def assignString(topic: String, partitions: Iterable[Int]): 
String = {
+JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p)))
+  }
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+  super.afterAll()
+}
+  }
+
+  private def createDF(topic: String,
+  withOptions: Map[String, String] = Map.empty[String, String]) = {
+val df = spark
+  .read
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("subscribe", topic)
+withOptions.foreach {
+  case (key, value) => df.option(key, value)
+}
+df.load().selectExpr("CAST(value AS STRING)")
+  }
+
+
+  test("explicit earliest to latest offsets") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("20"), Some(2))
+
+// Specify explicit earliest and latest offset values
+val df = createDF(topic,
+  withOptions = Map("startingOffsets" -> "earliest", "endingOffsets" 
-> "latest"))
+checkAnswer(df, (0 to 20).map(_.toString).toDF)
+
+// "latest" should late bind to the current (latest) offset in the df
+testUtils.sendMessages(topic, (21 to 29).map(_.toString).toArray, 
Some(2))
+checkAnswer(df, (0 to 29).map(_.toString).toDF)
+  }
+
+  test("default starting and ending offsets") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("20"), Some(2))
+
+// Implicit offset values, should default to earliest and latest
+val df = createDF(topic)
+// Test that we default to "earliest" and "latest"
+checkAnswer(df, (0 to 20).map(_.toString).toDF)
+  }
+
+  test("explicit offsets") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("20"), Some(2))
+
+// Test explicitly specified offsets
+val startPartitionOffsets = Map(
+  new TopicPartition(topic, 0) -> -2L, // -2 => earliest
+  new TopicPartition(topic, 1) -> -2L,
+  new TopicPartition(topic, 2) -> 0L   // explicit earliest
+)
+val startingOffsets = JsonUtils.partitionOffsets(startPartitionOffsets)
+
+

[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99332525
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaTopicPartitionOffsetReader.scala
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
+
+/**
+ * This class uses Kafka's own [[KafkaConsumer]] API to read data offsets 
from Kafka.
+ * The [[ConsumerStrategy]] class defines which Kafka topics and 
partitions should be read
+ * by this source. These strategies directly correspond to the different 
consumption options
+ * in. This class is designed to return a configured [[KafkaConsumer]] 
that is used by the
+ * [[KafkaSource]] to query for the offsets. See the docs on
+ * [[org.apache.spark.sql.kafka010.ConsumerStrategy]]
+ * for more details.
+ *
+ * Note: This class is not ThreadSafe
+ */
+private[kafka010] class KafkaTopicPartitionOffsetReader(
+consumerStrategy: ConsumerStrategy,
+driverKafkaParams: ju.Map[String, Object],
+readerOptions: Map[String, String],
+driverGroupIdPrefix: String) extends Logging {
+  /**
+   * Used to ensure execute fetch operations execute in an 
UninterruptibleThread
+   */
+  val kafkaReaderThread = Executors.newSingleThreadExecutor(new 
ThreadFactory {
+override def newThread(r: Runnable): Thread = {
+  val t = new UninterruptibleThread("Kafka Offset Reader") {
+override def run(): Unit = {
+  r.run()
+}
+  }
+  t.setDaemon(true)
+  t
+}
+  })
+  val execContext = ExecutionContext.fromExecutorService(kafkaReaderThread)
+
+  /**
+   * A KafkaConsumer used in the driver to query the latest Kafka offsets. 
This only queries the
+   * offsets and never commits them.
+   */
+  protected var consumer = createConsumer()
+
+  private val maxOffsetFetchAttempts =
+readerOptions.getOrElse("fetchOffset.numRetries", "3").toInt
+
+  private val offsetFetchAttemptIntervalMs =
+readerOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong
+
+  private var groupId: String = null
+
+  private var nextId = 0
+
+  private def nextGroupId(): String = {
+groupId = driverGroupIdPrefix + "-" + nextId
+nextId += 1
+groupId
+  }
+
+  override def toString(): String = consumerStrategy.toString
+
+  /**
+   * Closes the connection to Kafka, and cleans up state.
+   */
+  def close(): Unit = {
+consumer.close()
+kafkaReaderThread.shutdownNow()
+  }
+
+  /**
+   * @return The Set of TopicPartitions for a given topic
+   */
+  def fetchTopicPartitions(): Set[TopicPartition] = runUninterruptibly {
+assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+// Poll to get the latest assigned partitions
+consumer.poll(0)
+val partitions = consumer.assignment()
+consumer.pause(partitions)
+partitions.asScala.toSet
+  }
+
+  /**
+   * Resolves the specific offsets based on Kafka seek positions.
+   * This method resolves offset value -1 to the latest and -2 to the
+   * earliest Kafka seek position.
+   */
+  def fetchSpecificOffsets(
+  partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, 

[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99329495
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/ConsumerStrategy.scala
 ---
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import scala.collection.JavaConverters._
+
+import org.apache.kafka.clients.consumer.{Consumer, KafkaConsumer}
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
+import org.apache.kafka.common.TopicPartition
+
+/**
+ * Subscribe allows you to subscribe to a fixed collection of topics.
+ * SubscribePattern allows you to use a regex to specify topics of 
interest.
+ * Note that unlike the 0.8 integration, * using Subscribe or 
SubscribePattern
+ * should respond to adding partitions during a running stream.
+ * Finally, Assign allows you to specify a fixed collection of partitions.
+ * All three strategies have overloaded constructors that allow you to 
specify
+ * the starting offset for a particular partition.
+ */
+sealed trait ConsumerStrategy {
+  def createConsumer(kafkaParams: ju.Map[String, Object]): 
Consumer[Array[Byte], Array[Byte]]
+}
+
+/**
--- End diff --

nit: short docs can be put in a single line `/** blah blah */`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99330214
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsets.scala
 ---
@@ -19,14 +19,31 @@ package org.apache.spark.sql.kafka010
 
 import org.apache.kafka.common.TopicPartition
 
-/*
- * Values that can be specified for config startingOffsets
+/**
+ * Values that can be specified to configure starting,
+ * ending, and specific offsets.
  */
-private[kafka010] sealed trait StartingOffsets
+private[kafka010] sealed trait KafkaOffsets
--- End diff --

How about `KafkaRangeLimit` or `KafkaOffsetRangeLimit`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99331072
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -71,94 +76,145 @@ private[kafka010] class KafkaSourceProvider extends 
StreamSourceProvider
 .map { k => k.drop(6).toString -> parameters(k) }
 .toMap
 
-val deserClassName = classOf[ByteArrayDeserializer].getName
-// Each running query should use its own group id. Otherwise, the 
query may be only assigned
-// partial data since Kafka will assign partitions to multiple 
consumers having the same group
-// id. Hence, we should generate a unique id for each query.
-val uniqueGroupId = 
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
-
-val startingOffsets =
+val startingStreamOffsets =
   
caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) 
match {
 case Some("latest") => LatestOffsets
 case Some("earliest") => EarliestOffsets
 case Some(json) => 
SpecificOffsets(JsonUtils.partitionOffsets(json))
 case None => LatestOffsets
   }
 
-val kafkaParamsForDriver =
-  ConfigUpdater("source", specifiedKafkaParams)
-.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
-.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
deserClassName)
-
-// Set to "earliest" to avoid exceptions. However, KafkaSource 
will fetch the initial
-// offsets by itself instead of counting on KafkaConsumer.
-.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-
-// So that consumers in the driver does not commit offsets 
unnecessarily
-.set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-
-// So that the driver does not pull too much data
-.set(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, new 
java.lang.Integer(1))
-
-// If buffer config is not set, set it to reasonable value to work 
around
-// buffer issues (see KAFKA-3135)
-.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
-.build()
-
-val kafkaParamsForExecutors =
-  ConfigUpdater("executor", specifiedKafkaParams)
-.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
-.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
deserClassName)
-
-// Make sure executors do only what the driver tells them.
-.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
+val kafkaOffsetReader = new KafkaTopicPartitionOffsetReader(
+  strategy(caseInsensitiveParams),
+  kafkaParamsForDriver(specifiedKafkaParams),
+  parameters,
+  driverGroupIdPrefix = s"$uniqueGroupId-driver")
 
-// So that consumers in executors do not mess with any existing 
group id
-.set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-executor")
+new KafkaSource(
+  sqlContext,
+  kafkaOffsetReader,
+  kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
+  parameters,
+  metadataPath,
+  startingStreamOffsets,
+  failOnDataLoss(caseInsensitiveParams))
+  }
 
-// So that consumers in executors does not commit offsets 
unnecessarily
-.set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+  /**
+   * Returns a new base relation with the given parameters.
+   *
+   * @note The parameters' keywords are case insensitive and this 
insensitivity is enforced
+   *   by the Map that is passed to the function.
+   */
+  override def createRelation(
+  sqlContext: SQLContext,
+  parameters: Map[String, String]): BaseRelation = {
+  validateBatchOptions(parameters)
--- End diff --

incorrect indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99328473
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsets.scala
 ---
@@ -19,14 +19,31 @@ package org.apache.spark.sql.kafka010
 
 import org.apache.kafka.common.TopicPartition
 
-/*
- * Values that can be specified for config startingOffsets
+/**
+ * Values that can be specified to configure starting,
+ * ending, and specific offsets.
  */
-private[kafka010] sealed trait StartingOffsets
+private[kafka010] sealed trait KafkaOffsets
 
-private[kafka010] case object EarliestOffsets extends StartingOffsets
+/**
+ * Bind to the earliest offsets in Kafka
+ */
+private[kafka010] case object EarliestOffsets extends KafkaOffsets
 
-private[kafka010] case object LatestOffsets extends StartingOffsets
+/**
+ * Bind to the latest offsets in Kafka
+ */
+private[kafka010] case object LatestOffsets extends KafkaOffsets
 
+/**
+ * Bind to the specific offsets. A offset == -1 binds to the latest
+ * offset, and offset == -2 binds to the earliest offset.
+ */
 private[kafka010] case class SpecificOffsets(
-  partitionOffsets: Map[TopicPartition, Long]) extends StartingOffsets
+partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsets
+
+private[kafka010] object KafkaOffsets {
+  // Used to denote unbounded offset positions
--- End diff --

nit: Used to represent unresolved offset limits as longs
"unbounded" sounds like its infinite, or something.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99328317
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsets.scala
 ---
@@ -19,14 +19,31 @@ package org.apache.spark.sql.kafka010
 
 import org.apache.kafka.common.TopicPartition
 
-/*
- * Values that can be specified for config startingOffsets
+/**
+ * Values that can be specified to configure starting,
+ * ending, and specific offsets.
  */
-private[kafka010] sealed trait StartingOffsets
+private[kafka010] sealed trait KafkaOffsets
 
-private[kafka010] case object EarliestOffsets extends StartingOffsets
+/**
+ * Bind to the earliest offsets in Kafka
--- End diff --

nit: better docs. this is object, not a method. say what the object 
represents. "Bind to earliest offsets..." is like docs for a method


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99327772
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -149,12 +163,12 @@ private[kafka010] case class CachedKafkaConsumer 
private(
   //   offset   earliestOffset   min(untilOffset,latestOffset)   
max(untilOffset, latestOffset)
   val warningMessage =
 s"""
-   |The current available offset range is [$earliestOffset, 
$latestOffset).
-   | Offset ${offset} is out of range, and records in [$offset, 
$earliestOffset) will be
+   |The current available offset range is [${range.earliest}, 
${range.latest}).
--- End diff --

nit: same as above.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99327739
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -124,13 +138,13 @@ private[kafka010] case class CachedKafkaConsumer 
private(
   //   offset   untilOffset   earliestOffset   latestOffset
   val warningMessage =
 s"""
-  |The current available offset range is [$earliestOffset, 
$latestOffset).
+  |The current available offset range is [${range.earliest}, 
${range.latest}).
--- End diff --

nit: offset range is $range


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99329007
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaTopicPartitionOffsetReader.scala
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
+
+/**
+ * This class uses Kafka's own [[KafkaConsumer]] API to read data offsets 
from Kafka.
+ * The [[ConsumerStrategy]] class defines which Kafka topics and 
partitions should be read
+ * by this source. These strategies directly correspond to the different 
consumption options
+ * in. This class is designed to return a configured [[KafkaConsumer]] 
that is used by the
+ * [[KafkaSource]] to query for the offsets. See the docs on
+ * [[org.apache.spark.sql.kafka010.ConsumerStrategy]]
+ * for more details.
+ *
+ * Note: This class is not ThreadSafe
+ */
+private[kafka010] class KafkaTopicPartitionOffsetReader(
--- End diff --

nit: what the long name? why not simply KafkaOffsetReader as it was before?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99332314
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaTopicPartitionOffsetReader.scala
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
+
+/**
+ * This class uses Kafka's own [[KafkaConsumer]] API to read data offsets 
from Kafka.
+ * The [[ConsumerStrategy]] class defines which Kafka topics and 
partitions should be read
+ * by this source. These strategies directly correspond to the different 
consumption options
+ * in. This class is designed to return a configured [[KafkaConsumer]] 
that is used by the
+ * [[KafkaSource]] to query for the offsets. See the docs on
+ * [[org.apache.spark.sql.kafka010.ConsumerStrategy]]
+ * for more details.
+ *
+ * Note: This class is not ThreadSafe
+ */
+private[kafka010] class KafkaTopicPartitionOffsetReader(
+consumerStrategy: ConsumerStrategy,
+driverKafkaParams: ju.Map[String, Object],
+readerOptions: Map[String, String],
+driverGroupIdPrefix: String) extends Logging {
+  /**
+   * Used to ensure execute fetch operations execute in an 
UninterruptibleThread
+   */
+  val kafkaReaderThread = Executors.newSingleThreadExecutor(new 
ThreadFactory {
+override def newThread(r: Runnable): Thread = {
+  val t = new UninterruptibleThread("Kafka Offset Reader") {
+override def run(): Unit = {
+  r.run()
+}
+  }
+  t.setDaemon(true)
+  t
+}
+  })
+  val execContext = ExecutionContext.fromExecutorService(kafkaReaderThread)
+
+  /**
+   * A KafkaConsumer used in the driver to query the latest Kafka offsets. 
This only queries the
+   * offsets and never commits them.
+   */
+  protected var consumer = createConsumer()
+
+  private val maxOffsetFetchAttempts =
+readerOptions.getOrElse("fetchOffset.numRetries", "3").toInt
+
+  private val offsetFetchAttemptIntervalMs =
+readerOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong
+
+  private var groupId: String = null
+
+  private var nextId = 0
+
+  private def nextGroupId(): String = {
+groupId = driverGroupIdPrefix + "-" + nextId
+nextId += 1
+groupId
+  }
+
+  override def toString(): String = consumerStrategy.toString
+
+  /**
+   * Closes the connection to Kafka, and cleans up state.
+   */
+  def close(): Unit = {
+consumer.close()
+kafkaReaderThread.shutdownNow()
+  }
+
+  /**
+   * @return The Set of TopicPartitions for a given topic
+   */
+  def fetchTopicPartitions(): Set[TopicPartition] = runUninterruptibly {
+assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+// Poll to get the latest assigned partitions
+consumer.poll(0)
+val partitions = consumer.assignment()
+consumer.pause(partitions)
+partitions.asScala.toSet
+  }
+
+  /**
+   * Resolves the specific offsets based on Kafka seek positions.
+   * This method resolves offset value -1 to the latest and -2 to the
+   * earliest Kafka seek position.
+   */
+  def fetchSpecificOffsets(
+  partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, 

[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99329545
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsets.scala
 ---
@@ -19,14 +19,31 @@ package org.apache.spark.sql.kafka010
 
 import org.apache.kafka.common.TopicPartition
 
-/*
- * Values that can be specified for config startingOffsets
+/**
+ * Values that can be specified to configure starting,
+ * ending, and specific offsets.
  */
-private[kafka010] sealed trait StartingOffsets
+private[kafka010] sealed trait KafkaOffsets
--- End diff --

The more I think, I feel that its weird to name this generic 
"KafkaOffsets". Let's brainstorm on this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99328737
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
 ---
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+
+private[kafka010] class KafkaRelation(
+   override val sqlContext: SQLContext,
+   kafkaReader: 
KafkaTopicPartitionOffsetReader,
+   executorKafkaParams: ju.Map[String, 
Object],
+   sourceOptions: Map[String, String],
+   failOnDataLoss: Boolean,
+   startingOffsets: KafkaOffsets,
+   endingOffsets: KafkaOffsets)
+  extends BaseRelation with TableScan with Logging {
--- End diff --

incorrect indent


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99332003
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaTopicPartitionOffsetReader.scala
 ---
@@ -0,0 +1,300 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
+
+/**
+ * This class uses Kafka's own [[KafkaConsumer]] API to read data offsets 
from Kafka.
+ * The [[ConsumerStrategy]] class defines which Kafka topics and 
partitions should be read
+ * by this source. These strategies directly correspond to the different 
consumption options
+ * in. This class is designed to return a configured [[KafkaConsumer]] 
that is used by the
+ * [[KafkaSource]] to query for the offsets. See the docs on
+ * [[org.apache.spark.sql.kafka010.ConsumerStrategy]]
+ * for more details.
+ *
+ * Note: This class is not ThreadSafe
+ */
+private[kafka010] class KafkaTopicPartitionOffsetReader(
+consumerStrategy: ConsumerStrategy,
+driverKafkaParams: ju.Map[String, Object],
+readerOptions: Map[String, String],
+driverGroupIdPrefix: String) extends Logging {
+  /**
+   * Used to ensure execute fetch operations execute in an 
UninterruptibleThread
+   */
+  val kafkaReaderThread = Executors.newSingleThreadExecutor(new 
ThreadFactory {
+override def newThread(r: Runnable): Thread = {
+  val t = new UninterruptibleThread("Kafka Offset Reader") {
+override def run(): Unit = {
+  r.run()
+}
+  }
+  t.setDaemon(true)
+  t
+}
+  })
+  val execContext = ExecutionContext.fromExecutorService(kafkaReaderThread)
+
+  /**
+   * A KafkaConsumer used in the driver to query the latest Kafka offsets. 
This only queries the
+   * offsets and never commits them.
+   */
+  protected var consumer = createConsumer()
+
+  private val maxOffsetFetchAttempts =
+readerOptions.getOrElse("fetchOffset.numRetries", "3").toInt
+
+  private val offsetFetchAttemptIntervalMs =
+readerOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong
+
+  private var groupId: String = null
+
+  private var nextId = 0
+
+  private def nextGroupId(): String = {
+groupId = driverGroupIdPrefix + "-" + nextId
+nextId += 1
+groupId
+  }
+
+  override def toString(): String = consumerStrategy.toString
+
+  /**
+   * Closes the connection to Kafka, and cleans up state.
+   */
+  def close(): Unit = {
+consumer.close()
+kafkaReaderThread.shutdownNow()
+  }
+
+  /**
+   * @return The Set of TopicPartitions for a given topic
+   */
+  def fetchTopicPartitions(): Set[TopicPartition] = runUninterruptibly {
+assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+// Poll to get the latest assigned partitions
+consumer.poll(0)
+val partitions = consumer.assignment()
+consumer.pause(partitions)
+partitions.asScala.toSet
+  }
+
+  /**
+   * Resolves the specific offsets based on Kafka seek positions.
+   * This method resolves offset value -1 to the latest and -2 to the
+   * earliest Kafka seek position.
+   */
+  def fetchSpecificOffsets(
+  partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, 

[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-03 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99333285
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
 ---
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.kafka.common.TopicPartition
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.SharedSQLContext
+
+class KafkaRelationSuite extends QueryTest with BeforeAndAfter with 
SharedSQLContext {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  private var testUtils: KafkaTestUtils = _
+
+  private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
+
+  private def assignString(topic: String, partitions: Iterable[Int]): 
String = {
+JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p)))
+  }
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+  super.afterAll()
+}
+  }
+
+  private def createDF(topic: String,
--- End diff --

nit: should be
```
private def createDF(
topic: String, 
withOptions: ...
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99244144
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
 ---
@@ -135,7 +136,28 @@ private[kafka010] class KafkaSourceRDD(
 } else {
   new NextIterator[ConsumerRecord[Array[Byte], Array[Byte]]]() {
 val consumer = CachedKafkaConsumer.getOrCreate(
-  range.topic, range.partition, executorKafkaParams)
+range.topic, range.partition, executorKafkaParams, 
reuseKafkaConsumer)
+if (range.fromOffset < 0 || range.untilOffset < 0) {
--- End diff --

Reworked it. Let me know what you think.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99240245
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -334,14 +334,15 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
   def getOrCreate(
   topic: String,
   partition: Int,
-  kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = 
synchronized {
+  kafkaParams: ju.Map[String, Object],
+  reuse: Boolean): CachedKafkaConsumer = synchronized {
--- End diff --

reuse existing. I changed the name to reuseExistingIfPresent.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99198943
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsets.scala
 ---
@@ -22,11 +22,11 @@ import org.apache.kafka.common.TopicPartition
 /*
  * Values that can be specified for config startingOffsets
  */
-private[kafka010] sealed trait StartingOffsets
+private[kafka010] sealed trait KafkaOffsets
 
-private[kafka010] case object EarliestOffsets extends StartingOffsets
+private[kafka010] case object EarliestOffsets extends KafkaOffsets
 
-private[kafka010] case object LatestOffsets extends StartingOffsets
+private[kafka010] case object LatestOffsets extends KafkaOffsets
 
 private[kafka010] case class SpecificOffsets(
-  partitionOffsets: Map[TopicPartition, Long]) extends StartingOffsets
+  partitionOffsets: Map[TopicPartition, Long]) extends KafkaOffsets
--- End diff --

incorrect indent. i believe 4 indents on continuation of param list.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99229152
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaUtils.scala
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+
+private[kafka010] object KafkaUtils {
--- End diff --

I dont see the need for this class. LATEST and EARLIEST is better put in 
object KafkaOffsets (trait already exists), and the other methods used to be 
part KafkaSource and may continue to be in their (unless anybody else uses it)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99227219
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -71,94 +77,152 @@ private[kafka010] class KafkaSourceProvider extends 
StreamSourceProvider
 .map { k => k.drop(6).toString -> parameters(k) }
 .toMap
 
-val deserClassName = classOf[ByteArrayDeserializer].getName
-// Each running query should use its own group id. Otherwise, the 
query may be only assigned
-// partial data since Kafka will assign partitions to multiple 
consumers having the same group
-// id. Hence, we should generate a unique id for each query.
-val uniqueGroupId = 
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
-
-val startingOffsets =
+val startingStreamOffsets =
   
caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) 
match {
 case Some("latest") => LatestOffsets
 case Some("earliest") => EarliestOffsets
 case Some(json) => 
SpecificOffsets(JsonUtils.partitionOffsets(json))
 case None => LatestOffsets
   }
 
-val kafkaParamsForDriver =
-  ConfigUpdater("source", specifiedKafkaParams)
-.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
-.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
deserClassName)
-
-// Set to "earliest" to avoid exceptions. However, KafkaSource 
will fetch the initial
-// offsets by itself instead of counting on KafkaConsumer.
-.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-
-// So that consumers in the driver does not commit offsets 
unnecessarily
-.set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-
-// So that the driver does not pull too much data
-.set(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, new 
java.lang.Integer(1))
-
-// If buffer config is not set, set it to reasonable value to work 
around
-// buffer issues (see KAFKA-3135)
-.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
-.build()
-
-val kafkaParamsForExecutors =
-  ConfigUpdater("executor", specifiedKafkaParams)
-.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
-.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
deserClassName)
-
-// Make sure executors do only what the driver tells them.
-.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
+val kafkaOffsetReader = new KafkaOffsetReaderImpl(
+  strategy(caseInsensitiveParams),
+  kafkaParamsForDriver(specifiedKafkaParams),
+  parameters,
+  driverGroupIdPrefix = s"$uniqueGroupId-driver")
 
-// So that consumers in executors do not mess with any existing 
group id
-.set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-executor")
+new KafkaSource(
+  sqlContext,
+  kafkaOffsetReader,
+  kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
+  parameters,
+  metadataPath,
+  startingStreamOffsets,
+  failOnDataLoss(caseInsensitiveParams))
+  }
 
-// So that consumers in executors does not commit offsets 
unnecessarily
-.set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+  /**
+   * Returns a new base relation with the given parameters.
+   *
+   * @note The parameters' keywords are case insensitive and this 
insensitivity is enforced
+   *   by the Map that is passed to the function.
+   */
+  override def createRelation(
+sqlContext: SQLContext,
+parameters: Map[String, String]): BaseRelation = {
+validateOptions(parameters, Batch)
--- End diff --

Instead of defining a new trait called Mode, which why not do the 
following. 
- Two methods `validateBatchOptions` and `validateStreamingOptions`. Both 
of them internally call `validateCommonOptions`. 
- `createSource` calls `validateStreamingOptions`, and `createRelation` 
calls `validateBatchOptions`



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99198794
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
+
+
+private[kafka010] trait KafkaOffsetReader {
+
+  /**
+   * Closes the connection to Kafka, and cleans up state.
+   */
+  def close()
+
+  /**
+   * @return The Set of TopicPartitions for a given topic
+   */
+  def fetchTopicPartitions(): Set[TopicPartition]
+
+  /**
+   * Set consumer position to specified offsets, making sure all 
assignments are set.
+   */
+  def fetchSpecificStartingOffsets(
+partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the earliest offsets of partitions.
+   */
+  def fetchEarliestOffsets(): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the latest offsets of partitions.
+   */
+  def fetchLatestOffsets(): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the earliest offsets for newly discovered partitions. The 
return result may not contain
+   * some partitions if they are deleted.
+   */
+  def fetchNewPartitionEarliestOffsets(
+newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long]
+}
+
+/**
+ * This class uses Kafka's own [[KafkaConsumer]] API to read data offsets 
from Kafka.
+ *
+ * - The [[ConsumerStrategy]] class defines which Kafka topics and 
partitions should be read
+ *   by this source. These strategies directly correspond to the different 
consumption options
+ *   in . This class is designed to return a configured [[KafkaConsumer]] 
that is used by the
+ *   [[KafkaSource]] to query for the offsets. See the docs on
+ *   [[org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy]] 
for more details.
+ */
+private[kafka010] class KafkaOffsetReaderImpl(
+consumerStrategy: ConsumerStrategy,
+driverKafkaParams: ju.Map[String, Object],
+readerOptions: Map[String, String],
+driverGroupIdPrefix: String)
+  extends KafkaOffsetReader with Logging {
+
+  /**
+   * A KafkaConsumer used in the driver to query the latest Kafka offsets. 
This only queries the
+   * offsets and never commits them.
+   */
+  protected var consumer = createConsumer()
+
+  private val maxOffsetFetchAttempts =
+readerOptions.getOrElse("fetchOffset.numRetries", "3").toInt
+
+  private val offsetFetchAttemptIntervalMs =
+readerOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong
+
+  private var groupId: String = null
+
+  private var nextId = 0
+
+  private def nextGroupId(): String = {
+groupId = driverGroupIdPrefix + "-" + nextId
+nextId += 1
+groupId
+  }
+
+  override def toString(): String = consumerStrategy.toString
+
+  def close(): Unit = consumer.close()
+
+  override def fetchTopicPartitions(): Set[TopicPartition] = {
+assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+// Poll to get the latest 

[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99199482
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsets.scala
 ---
@@ -22,11 +22,11 @@ import org.apache.kafka.common.TopicPartition
 /*
  * Values that can be specified for config startingOffsets
  */
-private[kafka010] sealed trait StartingOffsets
+private[kafka010] sealed trait KafkaOffsets
 
-private[kafka010] case object EarliestOffsets extends StartingOffsets
+private[kafka010] case object EarliestOffsets extends KafkaOffsets
--- End diff --

docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99201167
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
 ---
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.kafka.common.TopicPartition
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.SharedSQLContext
+
+class KafkaRelationSuite extends QueryTest with BeforeAndAfter with 
SharedSQLContext {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  private var testUtils: KafkaTestUtils = _
+
+  private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
+
+  private def assignString(topic: String, partitions: Iterable[Int]): 
String = {
+JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p)))
+  }
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+  super.afterAll()
+}
+  }
+
+  test("explicit earliest to latest offsets") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("20"), Some(2))
+
+// Specify explicit earliest and latest offset values
+val df = spark
+  .read
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  .option("endingOffsets", "latest")
+  .load()
+  .selectExpr("CAST(value AS STRING)")
+checkAnswer(df, (0 to 20).map(_.toString).toDF)
+
+// "latest" should late bind to the current (latest) offset in the df
+testUtils.sendMessages(topic, (21 to 29).map(_.toString).toArray, 
Some(2))
+checkAnswer(df, (0 to 29).map(_.toString).toDF)
+  }
+
+  test("default starting and ending offsets") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("20"), Some(2))
+
+// Implicit offset values, should default to earliest and latest
+val df = spark
+  .read
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("subscribe", topic)
+  .load()
+  .selectExpr("CAST(value AS STRING)")
+// Test that we default to "earliest" and "latest"
+checkAnswer(df, (0 to 20).map(_.toString).toDF)
+  }
+
+  test("explicit offsets") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("20"), Some(2))
+
+// Test explicitly specified offsets
+val startPartitionOffsets = Map(
+  new TopicPartition(topic, 0) -> -2L, // -2 => earliest
+  new TopicPartition(topic, 1) -> -2L,
+  new TopicPartition(topic, 2) -> 0L   // explicit earliest
+)
+val startingOffsets = JsonUtils.partitionOffsets(startPartitionOffsets)
+
+val endPartitionOffsets = Map(
+  

[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99227617
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -71,94 +77,152 @@ private[kafka010] class KafkaSourceProvider extends 
StreamSourceProvider
 .map { k => k.drop(6).toString -> parameters(k) }
 .toMap
 
-val deserClassName = classOf[ByteArrayDeserializer].getName
-// Each running query should use its own group id. Otherwise, the 
query may be only assigned
-// partial data since Kafka will assign partitions to multiple 
consumers having the same group
-// id. Hence, we should generate a unique id for each query.
-val uniqueGroupId = 
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
-
-val startingOffsets =
+val startingStreamOffsets =
   
caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) 
match {
 case Some("latest") => LatestOffsets
 case Some("earliest") => EarliestOffsets
 case Some(json) => 
SpecificOffsets(JsonUtils.partitionOffsets(json))
 case None => LatestOffsets
   }
 
-val kafkaParamsForDriver =
-  ConfigUpdater("source", specifiedKafkaParams)
-.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
-.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
deserClassName)
-
-// Set to "earliest" to avoid exceptions. However, KafkaSource 
will fetch the initial
-// offsets by itself instead of counting on KafkaConsumer.
-.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-
-// So that consumers in the driver does not commit offsets 
unnecessarily
-.set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-
-// So that the driver does not pull too much data
-.set(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, new 
java.lang.Integer(1))
-
-// If buffer config is not set, set it to reasonable value to work 
around
-// buffer issues (see KAFKA-3135)
-.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
-.build()
-
-val kafkaParamsForExecutors =
-  ConfigUpdater("executor", specifiedKafkaParams)
-.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
-.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
deserClassName)
-
-// Make sure executors do only what the driver tells them.
-.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
+val kafkaOffsetReader = new KafkaOffsetReaderImpl(
+  strategy(caseInsensitiveParams),
+  kafkaParamsForDriver(specifiedKafkaParams),
+  parameters,
+  driverGroupIdPrefix = s"$uniqueGroupId-driver")
 
-// So that consumers in executors do not mess with any existing 
group id
-.set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-executor")
+new KafkaSource(
+  sqlContext,
+  kafkaOffsetReader,
+  kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
+  parameters,
+  metadataPath,
+  startingStreamOffsets,
+  failOnDataLoss(caseInsensitiveParams))
+  }
 
-// So that consumers in executors does not commit offsets 
unnecessarily
-.set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+  /**
+   * Returns a new base relation with the given parameters.
+   *
+   * @note The parameters' keywords are case insensitive and this 
insensitivity is enforced
+   *   by the Map that is passed to the function.
+   */
+  override def createRelation(
+sqlContext: SQLContext,
+parameters: Map[String, String]): BaseRelation = {
+validateOptions(parameters, Batch)
+// Each running query should use its own group id. Otherwise, the 
query may be only assigned
+// partial data since Kafka will assign partitions to multiple 
consumers having the same group
+// id. Hence, we should generate a unique id for each query.
+val uniqueGroupId = s"spark-kafka-relation-${UUID.randomUUID}"
+val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase, v) }
+val specifiedKafkaParams =
+  parameters
+.keySet
+.filter(_.toLowerCase.startsWith("kafka."))
+.map { k => k.drop(6).toString -> parameters(k) }
+.toMap
 
-// If buffer config is not set, set it to reasonable value to work 
around
-// buffer issues (see KAFKA-3135)
-.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
-.build()
+

[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99197254
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
+
+
+private[kafka010] trait KafkaOffsetReader {
+
+  /**
+   * Closes the connection to Kafka, and cleans up state.
+   */
+  def close()
+
+  /**
+   * @return The Set of TopicPartitions for a given topic
+   */
+  def fetchTopicPartitions(): Set[TopicPartition]
+
+  /**
+   * Set consumer position to specified offsets, making sure all 
assignments are set.
+   */
+  def fetchSpecificStartingOffsets(
+partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the earliest offsets of partitions.
+   */
+  def fetchEarliestOffsets(): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the latest offsets of partitions.
+   */
+  def fetchLatestOffsets(): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the earliest offsets for newly discovered partitions. The 
return result may not contain
+   * some partitions if they are deleted.
+   */
+  def fetchNewPartitionEarliestOffsets(
--- End diff --

what is the meaning of new in this context? this a trait which has no other 
context. why not just be `fetchEarliestOffsets(partitions)`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99200075
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -251,7 +315,43 @@ private[kafka010] class KafkaSourceProvider extends 
StreamSourceProvider
 }
   }
 
-  override def shortName(): String = "kafka"
+  private def validateStream(caseInsensitiveParams: Map[String, String]) = 
{
+caseInsensitiveParams.get(ENDING_OFFSETS_OPTION_KEY).map(_ =>
+  throw new IllegalArgumentException("ending offset not valid in 
stream mode"))
--- End diff --

nit: lets be consistent with the error message style. use "streaming 
queries" and "batch queries". instead of "stream mode"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99227485
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -71,94 +77,152 @@ private[kafka010] class KafkaSourceProvider extends 
StreamSourceProvider
 .map { k => k.drop(6).toString -> parameters(k) }
 .toMap
 
-val deserClassName = classOf[ByteArrayDeserializer].getName
-// Each running query should use its own group id. Otherwise, the 
query may be only assigned
-// partial data since Kafka will assign partitions to multiple 
consumers having the same group
-// id. Hence, we should generate a unique id for each query.
-val uniqueGroupId = 
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
-
-val startingOffsets =
+val startingStreamOffsets =
   
caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) 
match {
 case Some("latest") => LatestOffsets
 case Some("earliest") => EarliestOffsets
 case Some(json) => 
SpecificOffsets(JsonUtils.partitionOffsets(json))
 case None => LatestOffsets
   }
 
-val kafkaParamsForDriver =
-  ConfigUpdater("source", specifiedKafkaParams)
-.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
-.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
deserClassName)
-
-// Set to "earliest" to avoid exceptions. However, KafkaSource 
will fetch the initial
-// offsets by itself instead of counting on KafkaConsumer.
-.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-
-// So that consumers in the driver does not commit offsets 
unnecessarily
-.set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-
-// So that the driver does not pull too much data
-.set(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, new 
java.lang.Integer(1))
-
-// If buffer config is not set, set it to reasonable value to work 
around
-// buffer issues (see KAFKA-3135)
-.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
-.build()
-
-val kafkaParamsForExecutors =
-  ConfigUpdater("executor", specifiedKafkaParams)
-.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
-.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
deserClassName)
-
-// Make sure executors do only what the driver tells them.
-.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
+val kafkaOffsetReader = new KafkaOffsetReaderImpl(
+  strategy(caseInsensitiveParams),
+  kafkaParamsForDriver(specifiedKafkaParams),
+  parameters,
+  driverGroupIdPrefix = s"$uniqueGroupId-driver")
 
-// So that consumers in executors do not mess with any existing 
group id
-.set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-executor")
+new KafkaSource(
+  sqlContext,
+  kafkaOffsetReader,
+  kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
+  parameters,
+  metadataPath,
+  startingStreamOffsets,
+  failOnDataLoss(caseInsensitiveParams))
+  }
 
-// So that consumers in executors does not commit offsets 
unnecessarily
-.set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+  /**
+   * Returns a new base relation with the given parameters.
+   *
+   * @note The parameters' keywords are case insensitive and this 
insensitivity is enforced
+   *   by the Map that is passed to the function.
+   */
+  override def createRelation(
+sqlContext: SQLContext,
+parameters: Map[String, String]): BaseRelation = {
+validateOptions(parameters, Batch)
+// Each running query should use its own group id. Otherwise, the 
query may be only assigned
+// partial data since Kafka will assign partitions to multiple 
consumers having the same group
+// id. Hence, we should generate a unique id for each query.
+val uniqueGroupId = s"spark-kafka-relation-${UUID.randomUUID}"
+val caseInsensitiveParams = parameters.map { case (k, v) => 
(k.toLowerCase, v) }
+val specifiedKafkaParams =
+  parameters
+.keySet
+.filter(_.toLowerCase.startsWith("kafka."))
+.map { k => k.drop(6).toString -> parameters(k) }
+.toMap
 
-// If buffer config is not set, set it to reasonable value to work 
around
-// buffer issues (see KAFKA-3135)
-.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
-.build()
+

[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99226571
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala
 ---
@@ -384,6 +384,9 @@ class KafkaSourceSuite extends KafkaSourceTest {
   }
 }
 
+// Specifying an ending offset
+testBadOptions("endingOffsets" -> "latest")("Ending offset not valid 
in stream mode")
--- End diff --

dont use the term mode. it meaning nothing to external users.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99198641
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
+
+
+private[kafka010] trait KafkaOffsetReader {
+
+  /**
+   * Closes the connection to Kafka, and cleans up state.
+   */
+  def close()
+
+  /**
+   * @return The Set of TopicPartitions for a given topic
+   */
+  def fetchTopicPartitions(): Set[TopicPartition]
+
+  /**
+   * Set consumer position to specified offsets, making sure all 
assignments are set.
+   */
+  def fetchSpecificStartingOffsets(
+partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the earliest offsets of partitions.
+   */
+  def fetchEarliestOffsets(): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the latest offsets of partitions.
+   */
+  def fetchLatestOffsets(): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the earliest offsets for newly discovered partitions. The 
return result may not contain
+   * some partitions if they are deleted.
+   */
+  def fetchNewPartitionEarliestOffsets(
+newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long]
+}
+
+/**
+ * This class uses Kafka's own [[KafkaConsumer]] API to read data offsets 
from Kafka.
+ *
+ * - The [[ConsumerStrategy]] class defines which Kafka topics and 
partitions should be read
+ *   by this source. These strategies directly correspond to the different 
consumption options
+ *   in . This class is designed to return a configured [[KafkaConsumer]] 
that is used by the
+ *   [[KafkaSource]] to query for the offsets. See the docs on
+ *   [[org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy]] 
for more details.
+ */
+private[kafka010] class KafkaOffsetReaderImpl(
+consumerStrategy: ConsumerStrategy,
+driverKafkaParams: ju.Map[String, Object],
+readerOptions: Map[String, String],
+driverGroupIdPrefix: String)
+  extends KafkaOffsetReader with Logging {
+
+  /**
+   * A KafkaConsumer used in the driver to query the latest Kafka offsets. 
This only queries the
+   * offsets and never commits them.
+   */
+  protected var consumer = createConsumer()
+
+  private val maxOffsetFetchAttempts =
+readerOptions.getOrElse("fetchOffset.numRetries", "3").toInt
+
+  private val offsetFetchAttemptIntervalMs =
+readerOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong
+
+  private var groupId: String = null
+
+  private var nextId = 0
+
+  private def nextGroupId(): String = {
+groupId = driverGroupIdPrefix + "-" + nextId
+nextId += 1
+groupId
+  }
+
+  override def toString(): String = consumerStrategy.toString
+
+  def close(): Unit = consumer.close()
+
+  override def fetchTopicPartitions(): Set[TopicPartition] = {
+assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+// Poll to get the latest 

[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99200239
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -251,7 +315,43 @@ private[kafka010] class KafkaSourceProvider extends 
StreamSourceProvider
 }
   }
 
-  override def shortName(): String = "kafka"
+  private def validateStream(caseInsensitiveParams: Map[String, String]) = 
{
--- End diff --

validateOptionsForStreaming, validateOptionsForBatch
OR
validateStreamingOptions, validateBatchOptions


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99193469
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
 ---
@@ -135,7 +136,28 @@ private[kafka010] class KafkaSourceRDD(
 } else {
   new NextIterator[ConsumerRecord[Array[Byte], Array[Byte]]]() {
 val consumer = CachedKafkaConsumer.getOrCreate(
-  range.topic, range.partition, executorKafkaParams)
+range.topic, range.partition, executorKafkaParams, 
reuseKafkaConsumer)
+if (range.fromOffset < 0 || range.untilOffset < 0) {
--- End diff --

nit: Does this piece of code need to resolve the range need to be inside 
the NextIterator? This is cause a lot of unnecessary nesting. Instead of making 
the range var, you can resolve the range above and then create the NextIterator.

Furthermore, why use rawConsumer directly and expose it? Why not use 
`CachedKafkaConsumer.getAvailableOffsetRange()`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99196664
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
+
+
+private[kafka010] trait KafkaOffsetReader {
+
+  /**
+   * Closes the connection to Kafka, and cleans up state.
+   */
+  def close()
+
+  /**
+   * @return The Set of TopicPartitions for a given topic
+   */
+  def fetchTopicPartitions(): Set[TopicPartition]
+
+  /**
+   * Set consumer position to specified offsets, making sure all 
assignments are set.
+   */
+  def fetchSpecificStartingOffsets(
+partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the earliest offsets of partitions.
+   */
+  def fetchEarliestOffsets(): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the latest offsets of partitions.
+   */
+  def fetchLatestOffsets(): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the earliest offsets for newly discovered partitions. The 
return result may not contain
+   * some partitions if they are deleted.
+   */
+  def fetchNewPartitionEarliestOffsets(
+newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long]
+}
+
+/**
+ * This class uses Kafka's own [[KafkaConsumer]] API to read data offsets 
from Kafka.
+ *
+ * - The [[ConsumerStrategy]] class defines which Kafka topics and 
partitions should be read
+ *   by this source. These strategies directly correspond to the different 
consumption options
+ *   in . This class is designed to return a configured [[KafkaConsumer]] 
that is used by the
--- End diff --

and why a single bullet point?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99192856
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -42,7 +42,7 @@ private[kafka010] case class CachedKafkaConsumer private(
 
   private val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
 
-  private var consumer = createConsumer
+  var rawConsumer = createConsumer
--- End diff --

and why renamed?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99227773
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -278,5 +378,13 @@ private[kafka010] class KafkaSourceProvider extends 
StreamSourceProvider
 private[kafka010] object KafkaSourceProvider {
   private val STRATEGY_OPTION_KEYS = Set("subscribe", "subscribepattern", 
"assign")
   private val STARTING_OFFSETS_OPTION_KEY = "startingoffsets"
+  private val ENDING_OFFSETS_OPTION_KEY = "endingoffsets"
   private val FAIL_ON_DATA_LOSS_OPTION_KEY = "failondataloss"
+
+  // Used to check parameters for different source modes
+  private sealed trait Mode
--- End diff --

Commented elsewhere, Mode should not be required.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99199931
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -251,7 +315,43 @@ private[kafka010] class KafkaSourceProvider extends 
StreamSourceProvider
 }
   }
 
-  override def shortName(): String = "kafka"
+  private def validateStream(caseInsensitiveParams: Map[String, String]) = 
{
+caseInsensitiveParams.get(ENDING_OFFSETS_OPTION_KEY).map(_ =>
+  throw new IllegalArgumentException("ending offset not valid in 
stream mode"))
+  }
+
+  private def validateBatch(caseInsensitiveParams: Map[String, String]) = {
+
caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) 
match {
+  case Some("earliest") => // good to go
+  case Some("latest") =>
+throw new IllegalArgumentException("starting relation offset can't 
be latest")
--- End diff --

"Starting offsets can't be latest for batch queries on Kafka"


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99201123
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
 ---
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.kafka.common.TopicPartition
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.SharedSQLContext
+
+class KafkaRelationSuite extends QueryTest with BeforeAndAfter with 
SharedSQLContext {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  private var testUtils: KafkaTestUtils = _
+
+  private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
+
+  private def assignString(topic: String, partitions: Iterable[Int]): 
String = {
+JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p)))
+  }
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+  super.afterAll()
+}
+  }
+
+  test("explicit earliest to latest offsets") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("20"), Some(2))
+
+// Specify explicit earliest and latest offset values
+val df = spark
+  .read
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  .option("endingOffsets", "latest")
+  .load()
+  .selectExpr("CAST(value AS STRING)")
+checkAnswer(df, (0 to 20).map(_.toString).toDF)
+
+// "latest" should late bind to the current (latest) offset in the df
+testUtils.sendMessages(topic, (21 to 29).map(_.toString).toArray, 
Some(2))
+checkAnswer(df, (0 to 29).map(_.toString).toDF)
+  }
+
+  test("default starting and ending offsets") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("20"), Some(2))
+
+// Implicit offset values, should default to earliest and latest
+val df = spark
+  .read
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("subscribe", topic)
+  .load()
+  .selectExpr("CAST(value AS STRING)")
+// Test that we default to "earliest" and "latest"
+checkAnswer(df, (0 to 20).map(_.toString).toDF)
+  }
+
+  test("explicit offsets") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("20"), Some(2))
+
+// Test explicitly specified offsets
+val startPartitionOffsets = Map(
+  new TopicPartition(topic, 0) -> -2L, // -2 => earliest
+  new TopicPartition(topic, 1) -> -2L,
+  new TopicPartition(topic, 2) -> 0L   // explicit earliest
+)
+val startingOffsets = JsonUtils.partitionOffsets(startPartitionOffsets)
+
+val endPartitionOffsets = Map(
+  

[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99196592
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
+
+
+private[kafka010] trait KafkaOffsetReader {
+
+  /**
+   * Closes the connection to Kafka, and cleans up state.
+   */
+  def close()
+
+  /**
+   * @return The Set of TopicPartitions for a given topic
+   */
+  def fetchTopicPartitions(): Set[TopicPartition]
+
+  /**
+   * Set consumer position to specified offsets, making sure all 
assignments are set.
+   */
+  def fetchSpecificStartingOffsets(
+partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the earliest offsets of partitions.
+   */
+  def fetchEarliestOffsets(): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the latest offsets of partitions.
+   */
+  def fetchLatestOffsets(): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the earliest offsets for newly discovered partitions. The 
return result may not contain
+   * some partitions if they are deleted.
+   */
+  def fetchNewPartitionEarliestOffsets(
+newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long]
+}
+
+/**
+ * This class uses Kafka's own [[KafkaConsumer]] API to read data offsets 
from Kafka.
+ *
+ * - The [[ConsumerStrategy]] class defines which Kafka topics and 
partitions should be read
+ *   by this source. These strategies directly correspond to the different 
consumption options
+ *   in . This class is designed to return a configured [[KafkaConsumer]] 
that is used by the
--- End diff --

nit: extra space.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99227400
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala
 ---
@@ -71,94 +77,152 @@ private[kafka010] class KafkaSourceProvider extends 
StreamSourceProvider
 .map { k => k.drop(6).toString -> parameters(k) }
 .toMap
 
-val deserClassName = classOf[ByteArrayDeserializer].getName
-// Each running query should use its own group id. Otherwise, the 
query may be only assigned
-// partial data since Kafka will assign partitions to multiple 
consumers having the same group
-// id. Hence, we should generate a unique id for each query.
-val uniqueGroupId = 
s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
-
-val startingOffsets =
+val startingStreamOffsets =
   
caseInsensitiveParams.get(STARTING_OFFSETS_OPTION_KEY).map(_.trim.toLowerCase) 
match {
 case Some("latest") => LatestOffsets
 case Some("earliest") => EarliestOffsets
 case Some(json) => 
SpecificOffsets(JsonUtils.partitionOffsets(json))
 case None => LatestOffsets
   }
 
-val kafkaParamsForDriver =
-  ConfigUpdater("source", specifiedKafkaParams)
-.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
-.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
deserClassName)
-
-// Set to "earliest" to avoid exceptions. However, KafkaSource 
will fetch the initial
-// offsets by itself instead of counting on KafkaConsumer.
-.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
-
-// So that consumers in the driver does not commit offsets 
unnecessarily
-.set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
-
-// So that the driver does not pull too much data
-.set(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, new 
java.lang.Integer(1))
-
-// If buffer config is not set, set it to reasonable value to work 
around
-// buffer issues (see KAFKA-3135)
-.setIfUnset(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: 
java.lang.Integer)
-.build()
-
-val kafkaParamsForExecutors =
-  ConfigUpdater("executor", specifiedKafkaParams)
-.set(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, deserClassName)
-.set(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
deserClassName)
-
-// Make sure executors do only what the driver tells them.
-.set(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
+val kafkaOffsetReader = new KafkaOffsetReaderImpl(
+  strategy(caseInsensitiveParams),
+  kafkaParamsForDriver(specifiedKafkaParams),
+  parameters,
+  driverGroupIdPrefix = s"$uniqueGroupId-driver")
 
-// So that consumers in executors do not mess with any existing 
group id
-.set(ConsumerConfig.GROUP_ID_CONFIG, s"$uniqueGroupId-executor")
+new KafkaSource(
+  sqlContext,
+  kafkaOffsetReader,
+  kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId),
+  parameters,
+  metadataPath,
+  startingStreamOffsets,
+  failOnDataLoss(caseInsensitiveParams))
+  }
 
-// So that consumers in executors does not commit offsets 
unnecessarily
-.set(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+  /**
+   * Returns a new base relation with the given parameters.
+   *
+   * @note The parameters' keywords are case insensitive and this 
insensitivity is enforced
+   *   by the Map that is passed to the function.
+   */
+  override def createRelation(
+sqlContext: SQLContext,
--- End diff --

incorrect indentation.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99200748
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
 ---
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.kafka.common.TopicPartition
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.SharedSQLContext
+
+class KafkaRelationSuite extends QueryTest with BeforeAndAfter with 
SharedSQLContext {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  private var testUtils: KafkaTestUtils = _
+
+  private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
+
+  private def assignString(topic: String, partitions: Iterable[Int]): 
String = {
+JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p)))
+  }
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+  super.afterAll()
+}
+  }
+
+  test("explicit earliest to latest offsets") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("20"), Some(2))
+
+// Specify explicit earliest and latest offset values
+val df = spark
+  .read
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  .option("endingOffsets", "latest")
+  .load()
+  .selectExpr("CAST(value AS STRING)")
+checkAnswer(df, (0 to 20).map(_.toString).toDF)
+
+// "latest" should late bind to the current (latest) offset in the df
+testUtils.sendMessages(topic, (21 to 29).map(_.toString).toArray, 
Some(2))
+checkAnswer(df, (0 to 29).map(_.toString).toDF)
+  }
+
+  test("default starting and ending offsets") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("20"), Some(2))
+
+// Implicit offset values, should default to earliest and latest
+val df = spark
+  .read
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("subscribe", topic)
+  .load()
+  .selectExpr("CAST(value AS STRING)")
+// Test that we default to "earliest" and "latest"
+checkAnswer(df, (0 to 20).map(_.toString).toDF)
+  }
+
+  test("explicit offsets") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("20"), Some(2))
+
+// Test explicitly specified offsets
+val startPartitionOffsets = Map(
+  new TopicPartition(topic, 0) -> -2L, // -2 => earliest
+  new TopicPartition(topic, 1) -> -2L,
+  new TopicPartition(topic, 2) -> 0L   // explicit earliest
+)
+val startingOffsets = JsonUtils.partitionOffsets(startPartitionOffsets)
+
+val endPartitionOffsets = Map(
+  

[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99198390
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
+
+
+private[kafka010] trait KafkaOffsetReader {
+
+  /**
+   * Closes the connection to Kafka, and cleans up state.
+   */
+  def close()
+
+  /**
+   * @return The Set of TopicPartitions for a given topic
+   */
+  def fetchTopicPartitions(): Set[TopicPartition]
+
+  /**
+   * Set consumer position to specified offsets, making sure all 
assignments are set.
+   */
+  def fetchSpecificStartingOffsets(
+partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the earliest offsets of partitions.
+   */
+  def fetchEarliestOffsets(): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the latest offsets of partitions.
+   */
+  def fetchLatestOffsets(): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the earliest offsets for newly discovered partitions. The 
return result may not contain
+   * some partitions if they are deleted.
+   */
+  def fetchNewPartitionEarliestOffsets(
+newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long]
+}
+
+/**
+ * This class uses Kafka's own [[KafkaConsumer]] API to read data offsets 
from Kafka.
+ *
+ * - The [[ConsumerStrategy]] class defines which Kafka topics and 
partitions should be read
+ *   by this source. These strategies directly correspond to the different 
consumption options
+ *   in . This class is designed to return a configured [[KafkaConsumer]] 
that is used by the
+ *   [[KafkaSource]] to query for the offsets. See the docs on
+ *   [[org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy]] 
for more details.
+ */
+private[kafka010] class KafkaOffsetReaderImpl(
+consumerStrategy: ConsumerStrategy,
+driverKafkaParams: ju.Map[String, Object],
+readerOptions: Map[String, String],
+driverGroupIdPrefix: String)
+  extends KafkaOffsetReader with Logging {
+
+  /**
+   * A KafkaConsumer used in the driver to query the latest Kafka offsets. 
This only queries the
+   * offsets and never commits them.
+   */
+  protected var consumer = createConsumer()
+
+  private val maxOffsetFetchAttempts =
+readerOptions.getOrElse("fetchOffset.numRetries", "3").toInt
+
+  private val offsetFetchAttemptIntervalMs =
+readerOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong
+
+  private var groupId: String = null
+
+  private var nextId = 0
+
+  private def nextGroupId(): String = {
+groupId = driverGroupIdPrefix + "-" + nextId
+nextId += 1
+groupId
+  }
+
+  override def toString(): String = consumerStrategy.toString
+
+  def close(): Unit = consumer.close()
+
+  override def fetchTopicPartitions(): Set[TopicPartition] = {
+assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+// Poll to get the latest 

[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99197356
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
+
+
+private[kafka010] trait KafkaOffsetReader {
+
+  /**
+   * Closes the connection to Kafka, and cleans up state.
+   */
+  def close()
+
+  /**
+   * @return The Set of TopicPartitions for a given topic
+   */
+  def fetchTopicPartitions(): Set[TopicPartition]
+
+  /**
+   * Set consumer position to specified offsets, making sure all 
assignments are set.
--- End diff --

and whats the difference between earliest and starting offsets?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99228186
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaUtils.scala
 ---
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import org.apache.spark.SparkContext
+import org.apache.spark.scheduler.ExecutorCacheTaskLocation
+
+private[kafka010] object KafkaUtils {
+
+  // Used to denote unbounded offset positions
+  val LATEST = -1L
--- End diff --

Having these constants here does not make sense. Better to have them in an 
object KafkaOffsets and put these numbers in them. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99192771
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -42,7 +42,7 @@ private[kafka010] case class CachedKafkaConsumer private(
 
   private val groupId = 
kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG).asInstanceOf[String]
 
-  private var consumer = createConsumer
+  var rawConsumer = createConsumer
--- End diff --

exposing internal var is generally not a good idea. A better approach is be 
to add the necessary methods (for which you need the consumer) in the class 
CachedKafkaConsumer. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99196854
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
+
+
+private[kafka010] trait KafkaOffsetReader {
+
+  /**
+   * Closes the connection to Kafka, and cleans up state.
+   */
+  def close()
+
+  /**
+   * @return The Set of TopicPartitions for a given topic
+   */
+  def fetchTopicPartitions(): Set[TopicPartition]
+
+  /**
+   * Set consumer position to specified offsets, making sure all 
assignments are set.
+   */
+  def fetchSpecificStartingOffsets(
+partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the earliest offsets of partitions.
+   */
+  def fetchEarliestOffsets(): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the latest offsets of partitions.
+   */
+  def fetchLatestOffsets(): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the earliest offsets for newly discovered partitions. The 
return result may not contain
+   * some partitions if they are deleted.
+   */
+  def fetchNewPartitionEarliestOffsets(
+newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long]
+}
+
+/**
+ * This class uses Kafka's own [[KafkaConsumer]] API to read data offsets 
from Kafka.
+ *
+ * - The [[ConsumerStrategy]] class defines which Kafka topics and 
partitions should be read
+ *   by this source. These strategies directly correspond to the different 
consumption options
+ *   in . This class is designed to return a configured [[KafkaConsumer]] 
that is used by the
--- End diff --

would be good to mention that this class is not threadsafe. i see a lot of 
vars and all. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99196372
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
+
+
+private[kafka010] trait KafkaOffsetReader {
--- End diff --

this trait a little weird. `fetchTopicPartitions()` fetches topic and 
partitions of what?
clarifying these in the docs would be good.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99196482
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
+
+
+private[kafka010] trait KafkaOffsetReader {
+
+  /**
+   * Closes the connection to Kafka, and cleans up state.
+   */
+  def close()
+
+  /**
+   * @return The Set of TopicPartitions for a given topic
+   */
+  def fetchTopicPartitions(): Set[TopicPartition]
+
+  /**
+   * Set consumer position to specified offsets, making sure all 
assignments are set.
--- End diff --

This docs seems wrong. Name says it should fetch offsets, but docs says it 
sets something?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99195732
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala
 ---
@@ -334,14 +334,15 @@ private[kafka010] object CachedKafkaConsumer extends 
Logging {
   def getOrCreate(
   topic: String,
   partition: Int,
-  kafkaParams: ju.Map[String, Object]): CachedKafkaConsumer = 
synchronized {
+  kafkaParams: ju.Map[String, Object],
+  reuse: Boolean): CachedKafkaConsumer = synchronized {
--- End diff --

Does this mean reuse existing one, OR allow reuse in future? 



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-02-02 Thread tdas
Github user tdas commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r99195778
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -0,0 +1,389 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
+
+
+private[kafka010] trait KafkaOffsetReader {
--- End diff --

scala docs. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-31 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r98749100
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
 ---
@@ -135,7 +136,24 @@ private[kafka010] class KafkaSourceRDD(
 } else {
   new NextIterator[ConsumerRecord[Array[Byte], Array[Byte]]]() {
 val consumer = CachedKafkaConsumer.getOrCreate(
-  range.topic, range.partition, executorKafkaParams)
+range.topic, range.partition, executorKafkaParams, 
reuseKafkaConsumer)
+if (range.fromOffset < 0 || range.untilOffset < 0) {
+  // Late bind the offset range
+  val fromOffset = if (range.fromOffset < 0) {
+
consumer.rawConsumer.seekToBeginning(ju.Arrays.asList(range.topicPartition))
+consumer.rawConsumer.position(range.topicPartition)
+  } else {
+range.fromOffset
+  }
+  val untilOffset = if (range.untilOffset < 0) {
+
consumer.rawConsumer.seekToEnd(ju.Arrays.asList(range.topicPartition))
--- End diff --

nit: add assert(range.fromOffset == -1) to avoid breaking it in future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-31 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r98746133
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 ---
@@ -249,6 +249,24 @@ class KafkaTestUtils extends Logging {
 offsets
   }
 
+  def cleanupLogs(): Unit = {
+server.logManager.cleanupLogs()
+  }
+
+  def getEarliestOffsets(topics: Set[String]): Map[TopicPartition, Long] = 
{
+val kc = new KafkaConsumer[String, String](consumerConfiguration)
+logInfo("Created consumer to get latest offsets")
--- End diff --

nit: please fix the log


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-31 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r98746146
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 ---
@@ -249,6 +249,24 @@ class KafkaTestUtils extends Logging {
 offsets
   }
 
+  def cleanupLogs(): Unit = {
+server.logManager.cleanupLogs()
+  }
+
+  def getEarliestOffsets(topics: Set[String]): Map[TopicPartition, Long] = 
{
+val kc = new KafkaConsumer[String, String](consumerConfiguration)
+logInfo("Created consumer to get latest offsets")
+kc.subscribe(topics.asJavaCollection)
+kc.poll(0)
+val partitions = kc.assignment()
+kc.pause(partitions)
+kc.seekToBeginning(partitions)
+val offsets = partitions.asScala.map(p => p -> kc.position(p)).toMap
+kc.close()
+logInfo("Closed consumer to get latest offsets")
--- End diff --

nit: please fix the log


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-31 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r98746070
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaTestUtils.scala
 ---
@@ -274,6 +292,11 @@ class KafkaTestUtils extends Logging {
 props.put("log.flush.interval.messages", "1")
 props.put("replica.socket.timeout.ms", "1500")
 props.put("delete.topic.enable", "true")
+withBrokerProps.map { p =>
--- End diff --

nit: you can change the type of `withBrokerProps` to `Map[String, Object]`. 
Then here you can just use `props.putAll(withBrokerProps.asJava)`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-31 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r98756883
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -108,7 +113,14 @@ private[kafka010] class KafkaOffsetReaderImpl(
 
   def close(): Unit = consumer.close()
 
-  def fetchSpecificStartingOffsets(
+  override def fetchTopicPartitions(): Set[TopicPartition] = {
+assert(Thread.currentThread().isInstanceOf[UninterruptibleThread])
+// Poll to get the latest assigned partitions
+consumer.poll(0)
+consumer.assignment().asScala.toSet
--- End diff --

nit: please also call `pause` like this to avoid fetching the real data 
when reusing the relation.
```
val partitions = consumer.assignment()
consumer.pause(partitions)
partitions.asScala.toSet
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-31 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r98749048
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceRDD.scala
 ---
@@ -135,7 +136,24 @@ private[kafka010] class KafkaSourceRDD(
 } else {
   new NextIterator[ConsumerRecord[Array[Byte], Array[Byte]]]() {
 val consumer = CachedKafkaConsumer.getOrCreate(
-  range.topic, range.partition, executorKafkaParams)
+range.topic, range.partition, executorKafkaParams, 
reuseKafkaConsumer)
+if (range.fromOffset < 0 || range.untilOffset < 0) {
+  // Late bind the offset range
+  val fromOffset = if (range.fromOffset < 0) {
+
consumer.rawConsumer.seekToBeginning(ju.Arrays.asList(range.topicPartition))
--- End diff --

nit: add `assert(range.fromOffset == -2)` to avoid breaking it in future.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-27 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r98237559
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.kafka.common.TopicPartition
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+class KafkaRelationSuite extends QueryTest with BeforeAndAfter with 
SharedSQLContext {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  private var testUtils: KafkaTestUtils = _
+
+  private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
+
+  private def assignString(topic: String, partitions: Iterable[Int]): 
String = {
+JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p)))
+  }
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+  super.afterAll()
+}
+  }
+
+  test("explicit earliest to latest offsets") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("20"), Some(2))
+
+// Specify explicit earliest and latest offset values
+val reader = spark
+  .read
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  .option("endingOffsets", "latest")
+  .load()
+var df = reader.selectExpr("CAST(value AS STRING)")
+checkAnswer(df, (0 to 20).map(_.toString).toDF)
+
+// "latest" should late bind to the current (latest) offset in the 
reader
+testUtils.sendMessages(topic, (21 to 29).map(_.toString).toArray, 
Some(2))
--- End diff --

This no longer holds now that we're binding in the executor, right?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-26 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r98108260
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
 ---
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.kafka.common.TopicPartition
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+class KafkaRelationSuite extends QueryTest with BeforeAndAfter with 
SharedSQLContext {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  private var testUtils: KafkaTestUtils = _
+
+  private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
+
+  private def assignString(topic: String, partitions: Iterable[Int]): 
String = {
+JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p)))
+  }
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+  super.afterAll()
+}
+  }
+
+  test("explicit earliest to latest offsets") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("20"), Some(2))
+
+// Specify explicit earliest and latest offset values
+val reader = spark
+  .read
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  .option("endingOffsets", "latest")
+  .load()
+var df = reader.selectExpr("CAST(value AS STRING)")
+checkAnswer(df, (0 to 20).map(_.toString).toDF)
+
+// "latest" should late bind to the current (latest) offset in the 
reader
+testUtils.sendMessages(topic, (21 to 29).map(_.toString).toArray, 
Some(2))
--- End diff --

nit: could you add the following test below this line to make the semantics 
clear?
```
// The same DataFrame instance should return the same result
checkAnswer(df, (0 to 20).map(_.toString).toDF)
```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r97690998
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
+
+
+private[kafka010] trait KafkaOffsetReader {
+
+  /**
+   * Closes the connection to Kafka, and cleans up state.
+   */
+  def close()
+
+  /**
+   * Set consumer position to specified offsets, making sure all 
assignments are set.
+   */
+  def fetchSpecificStartingOffsets(
+partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the earliest offsets of partitions.
+   */
+  def fetchEarliestOffsets(): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the latest offsets of partitions.
+   */
+  def fetchLatestOffsets(): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the earliest offsets for newly discovered partitions. The 
return result may not contain
+   * some partitions if they are deleted.
+   */
+  def fetchNewPartitionEarliestOffsets(
+newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long]
+}
+
+/**
+ * This class uses Kafka's own [[KafkaConsumer]] API to read data offsets 
from Kafka.
+ *
+ * - The [[ConsumerStrategy]] class defines which Kafka topics and 
partitions should be read
+ *   by this source. These strategies directly correspond to the different 
consumption options
+ *   in . This class is designed to return a configured [[KafkaConsumer]] 
that is used by the
+ *   [[KafkaSource]] to query for the offsets. See the docs on
+ *   [[org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy]] 
for more details.
+ */
+private[kafka010] class KafkaOffsetReaderImpl(
+consumerStrategy: ConsumerStrategy,
+driverKafkaParams: ju.Map[String, Object],
+readerOptions: Map[String, String],
+driverGroupIdPrefix: String)
+  extends KafkaOffsetReader with Logging {
+
+  /**
+   * A KafkaConsumer used in the driver to query the latest Kafka offsets. 
This only queries the
+   * offsets and never commits them.
+   */
+  protected var consumer = createConsumer()
+
+  private val maxOffsetFetchAttempts =
+readerOptions.getOrElse("fetchOffset.numRetries", "3").toInt
+
+  private val offsetFetchAttemptIntervalMs =
+readerOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong
+
+  private var groupId: String = null
+
+  private var nextId = 0
+
+  private def nextGroupId(): String = {
+groupId = driverGroupIdPrefix + "-" + nextId
+nextId += 1
+groupId
+  }
+
+  override def toString(): String = consumerStrategy.toString
+
+  def close(): Unit = consumer.close()
+
+  def fetchSpecificStartingOffsets(
+  partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, 
Long] =
+withRetriesWithoutInterrupt {
+  // Poll to get the latest assigned partitions
+  consumer.poll(0)
+  val partitions = consumer.assignment()
+  

[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r97690962
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -0,0 +1,370 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{Executors, ThreadFactory}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
+
+
+private[kafka010] trait KafkaOffsetReader {
+
+  /**
+   * Closes the connection to Kafka, and cleans up state.
+   */
+  def close()
+
+  /**
+   * Set consumer position to specified offsets, making sure all 
assignments are set.
+   */
+  def fetchSpecificStartingOffsets(
+partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the earliest offsets of partitions.
+   */
+  def fetchEarliestOffsets(): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the latest offsets of partitions.
+   */
+  def fetchLatestOffsets(): Map[TopicPartition, Long]
+
+  /**
+   * Fetch the earliest offsets for newly discovered partitions. The 
return result may not contain
+   * some partitions if they are deleted.
+   */
+  def fetchNewPartitionEarliestOffsets(
+newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long]
+}
+
+/**
+ * This class uses Kafka's own [[KafkaConsumer]] API to read data offsets 
from Kafka.
+ *
+ * - The [[ConsumerStrategy]] class defines which Kafka topics and 
partitions should be read
+ *   by this source. These strategies directly correspond to the different 
consumption options
+ *   in . This class is designed to return a configured [[KafkaConsumer]] 
that is used by the
+ *   [[KafkaSource]] to query for the offsets. See the docs on
+ *   [[org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy]] 
for more details.
+ */
+private[kafka010] class KafkaOffsetReaderImpl(
+consumerStrategy: ConsumerStrategy,
+driverKafkaParams: ju.Map[String, Object],
+readerOptions: Map[String, String],
+driverGroupIdPrefix: String)
+  extends KafkaOffsetReader with Logging {
+
+  /**
+   * A KafkaConsumer used in the driver to query the latest Kafka offsets. 
This only queries the
+   * offsets and never commits them.
+   */
+  protected var consumer = createConsumer()
+
+  private val maxOffsetFetchAttempts =
+readerOptions.getOrElse("fetchOffset.numRetries", "3").toInt
+
+  private val offsetFetchAttemptIntervalMs =
+readerOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong
+
+  private var groupId: String = null
+
+  private var nextId = 0
+
+  private def nextGroupId(): String = {
+groupId = driverGroupIdPrefix + "-" + nextId
+nextId += 1
+groupId
+  }
+
+  override def toString(): String = consumerStrategy.toString
+
+  def close(): Unit = consumer.close()
+
+  def fetchSpecificStartingOffsets(
+  partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, 
Long] =
+withRetriesWithoutInterrupt {
+  // Poll to get the latest assigned partitions
+  consumer.poll(0)
+  val partitions = consumer.assignment()
+  

[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r97692374
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala
 ---
@@ -311,7 +271,7 @@ private[kafka010] class KafkaSource(
 
 // Create an RDD that reads from Kafka and get the (key, value) pair 
as byte arrays.
 val rdd = new KafkaSourceRDD(
-  sc, executorKafkaParams, offsetRanges, pollTimeoutMs, 
failOnDataLoss).map { cr =>
+  sc, executorKafkaParams, offsetRanges, pollTimeoutMs, 
failOnDataLoss, true).map { cr =>
--- End diff --

nit: `true` -> `reuseKafkaConsumer = true`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r97690727
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+
+private[kafka010] class KafkaRelation(
+override val sqlContext: SQLContext,
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+sourceOptions: Map[String, String],
+failOnDataLoss: Boolean,
+startingOffsets: KafkaOffsets,
+endingOffsets: KafkaOffsets)
+  extends BaseRelation with TableScan with Logging {
+  assert(startingOffsets != LatestOffsets,
+"Starting offset not allowed to be set to latest offsets.")
+  assert(endingOffsets != EarliestOffsets,
+"Ending offset not allowed to be set to earliest offsets.")
+
+  private val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sqlContext.sparkContext.conf.getTimeAsMs("spark.network.timeout", 
"120s").toString
+  ).toLong
+
+  override def schema: StructType = KafkaOffsetReader.kafkaSchema
+
+  override def buildScan(): RDD[Row] = {
+// Leverage the KafkaReader to obtain the relevant partition offsets
+val fromPartitionOffsets = getPartitionOffsets(startingOffsets)
+val untilPartitionOffsets = getPartitionOffsets(endingOffsets)
+// Obtain topicPartitions in both from and until partition offset, 
ignoring
+// topic partitions that were added and/or deleted between the two 
above calls.
+if (fromPartitionOffsets.keySet.size != 
untilPartitionOffsets.keySet.size) {
+  throw new IllegalStateException("Kafka return different topic 
partitions " +
+"for starting and ending offsets")
+}
+
+val sortedExecutors = 
KafkaUtils.getSortedExecutorList(sqlContext.sparkContext)
--- End diff --

nit: not used any more (these 3 lines)


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r97690665
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.kafka.common.TopicPartition
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+class KafkaRelationSuite extends QueryTest with BeforeAndAfter with 
SharedSQLContext {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  private var testUtils: KafkaTestUtils = _
+
+  private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
+
+  private def assignString(topic: String, partitions: Iterable[Int]): 
String = {
+JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p)))
+  }
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+  super.afterAll()
+}
+  }
+
+  test("explicit earliest to latest offsets") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("20"), Some(2))
+
+// Specify explicit earliest and latest offset values
+val reader = spark
+  .read
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  .option("endingOffsets", "latest")
+  .load()
+var df = reader.selectExpr("CAST(value AS STRING)")
+checkAnswer(df, (0 to 20).map(_.toString).toDF)
+
+// "latest" should late bind to the current (latest) offset in the 
reader
+testUtils.sendMessages(topic, (21 to 29).map(_.toString).toArray, 
Some(2))
+df = reader.selectExpr("CAST(value AS STRING)")
+checkAnswer(df, (0 to 29).map(_.toString).toDF)
+  }
+
+  test("default starting and ending offsets") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("20"), Some(2))
+
+// Implicit offset values, should default to earliest and latest
+val df = spark
+  .read
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("subscribe", topic)
+  .load()
+  .selectExpr("CAST(value AS STRING)")
+// Test that we default to "earliest" and "latest"
+checkAnswer(df, (0 to 20).map(_.toString).toDF)
+  }
+
+  test("explicit offsets") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("20"), Some(2))
+
+// Test explicitly specified offsets
+val startPartitionOffsets = Map(
+  new TopicPartition(topic, 0) -> -2L, // -2 => earliest
+  new TopicPartition(topic, 1) -> -2L,
+  new TopicPartition(topic, 2) -> 0L   // explicit earliest
+)
+val startingOffsets = 

[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r97691654
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+
+private[kafka010] class KafkaRelation(
+override val sqlContext: SQLContext,
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+sourceOptions: Map[String, String],
+failOnDataLoss: Boolean,
+startingOffsets: KafkaOffsets,
+endingOffsets: KafkaOffsets)
+  extends BaseRelation with TableScan with Logging {
+  assert(startingOffsets != LatestOffsets,
+"Starting offset not allowed to be set to latest offsets.")
+  assert(endingOffsets != EarliestOffsets,
+"Ending offset not allowed to be set to earliest offsets.")
+
+  private val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sqlContext.sparkContext.conf.getTimeAsMs("spark.network.timeout", 
"120s").toString
+  ).toLong
+
+  override def schema: StructType = KafkaOffsetReader.kafkaSchema
+
+  override def buildScan(): RDD[Row] = {
+// Leverage the KafkaReader to obtain the relevant partition offsets
+val fromPartitionOffsets = getPartitionOffsets(startingOffsets)
+val untilPartitionOffsets = getPartitionOffsets(endingOffsets)
+// Obtain topicPartitions in both from and until partition offset, 
ignoring
+// topic partitions that were added and/or deleted between the two 
above calls.
+if (fromPartitionOffsets.keySet.size != 
untilPartitionOffsets.keySet.size) {
+  throw new IllegalStateException("Kafka return different topic 
partitions " +
--- End diff --

nit: please include `fromPartitionOffsets` and `untilPartitionOffsets` to 
the exception message so that it's easy to debug such failure.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r97690588
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
 ---
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.kafka.common.TopicPartition
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.sql.QueryTest
+import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession}
+
+class KafkaRelationSuite extends QueryTest with BeforeAndAfter with 
SharedSQLContext {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  private var testUtils: KafkaTestUtils = _
+
+  private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
+
+  private def assignString(topic: String, partitions: Iterable[Int]): 
String = {
+JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p)))
+  }
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+  super.afterAll()
+}
+  }
+
+  test("explicit earliest to latest offsets") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("20"), Some(2))
+
+// Specify explicit earliest and latest offset values
+val reader = spark
+  .read
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  .option("endingOffsets", "latest")
+  .load()
+var df = reader.selectExpr("CAST(value AS STRING)")
+checkAnswer(df, (0 to 20).map(_.toString).toDF)
+
+// "latest" should late bind to the current (latest) offset in the 
reader
+testUtils.sendMessages(topic, (21 to 29).map(_.toString).toArray, 
Some(2))
+df = reader.selectExpr("CAST(value AS STRING)")
+checkAnswer(df, (0 to 29).map(_.toString).toDF)
+  }
+
+  test("default starting and ending offsets") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("20"), Some(2))
+
+// Implicit offset values, should default to earliest and latest
+val df = spark
+  .read
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("subscribe", topic)
+  .load()
+  .selectExpr("CAST(value AS STRING)")
+// Test that we default to "earliest" and "latest"
+checkAnswer(df, (0 to 20).map(_.toString).toDF)
+  }
+
+  test("explicit offsets") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("20"), Some(2))
+
+// Test explicitly specified offsets
+val startPartitionOffsets = Map(
+  new TopicPartition(topic, 0) -> -2L, // -2 => earliest
+  new TopicPartition(topic, 1) -> -2L,
+  new TopicPartition(topic, 2) -> 0L   // explicit earliest
+)
+val startingOffsets = 

[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r97691519
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
 ---
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+
+private[kafka010] class KafkaRelation(
+override val sqlContext: SQLContext,
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+sourceOptions: Map[String, String],
+failOnDataLoss: Boolean,
+startingOffsets: KafkaOffsets,
+endingOffsets: KafkaOffsets)
+  extends BaseRelation with TableScan with Logging {
+  assert(startingOffsets != LatestOffsets,
+"Starting offset not allowed to be set to latest offsets.")
+  assert(endingOffsets != EarliestOffsets,
+"Ending offset not allowed to be set to earliest offsets.")
+
+  private val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sqlContext.sparkContext.conf.getTimeAsMs("spark.network.timeout", 
"120s").toString
+  ).toLong
+
+  override def schema: StructType = KafkaOffsetReader.kafkaSchema
+
+  override def buildScan(): RDD[Row] = {
+// Leverage the KafkaReader to obtain the relevant partition offsets
+val fromPartitionOffsets = getPartitionOffsets(startingOffsets)
+val untilPartitionOffsets = getPartitionOffsets(endingOffsets)
+// Obtain topicPartitions in both from and until partition offset, 
ignoring
+// topic partitions that were added and/or deleted between the two 
above calls.
+if (fromPartitionOffsets.keySet.size != 
untilPartitionOffsets.keySet.size) {
+  throw new IllegalStateException("Kafka return different topic 
partitions " +
+"for starting and ending offsets")
+}
+
+val sortedExecutors = 
KafkaUtils.getSortedExecutorList(sqlContext.sparkContext)
+val numExecutors = sortedExecutors.length
+logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
+
+// Calculate offset ranges
+val offsetRanges = untilPartitionOffsets.keySet.map { tp =>
+  val fromOffset = fromPartitionOffsets.get(tp).getOrElse {
+  // This should not happen since topicPartitions contains all 
partitions not in
+  // fromPartitionOffsets
+  throw new IllegalStateException(s"$tp doesn't have a from 
offset")
+  }
+  val untilOffset = untilPartitionOffsets(tp)
+  KafkaSourceRDDOffsetRange(tp, fromOffset, untilOffset, None)
+}.toArray
+
+logInfo("GetBatch generating RDD of offset range: " +
+  offsetRanges.sortBy(_.topicPartition.toString).mkString(", "))
+
+// Create an RDD that reads from Kafka and get the (key, value) pair 
as byte arrays.
+val rdd = new KafkaSourceRDD(
+  sqlContext.sparkContext, executorKafkaParams, offsetRanges,
+  pollTimeoutMs, failOnDataLoss, false).map { cr =>
--- End diff --

nit: `false` -> `reuseKafkaConsumer = false`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org

[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r97657263
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
 ---
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.kafka.common.TopicPartition
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.test.SharedSQLContext
+
+class KafkaRelationSuite extends SparkFunSuite with BeforeAndAfter with 
SharedSQLContext {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  private var testUtils: KafkaTestUtils = _
+
+  private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
+
+  private def assignString(topic: String, partitions: Iterable[Int]): 
String = {
+JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p)))
+  }
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+  super.afterAll()
+}
+  }
+
+  test("batch processing earliest to latest") {
--- End diff --

Could you split this test to 3 tests to test each feature separately?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r97626627
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{Executor, LinkedBlockingQueue}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
+
+
+private[kafka010] trait KafkaOffsetReader {
+
+  def close()
+
+  def fetchSpecificStartingOffsets(
--- End diff --

nit: could you add comments for these methods?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r97627081
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{Executor, LinkedBlockingQueue}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
+
+
+private[kafka010] trait KafkaOffsetReader {
+
+  def close()
+
+  def fetchSpecificStartingOffsets(
+partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long]
+
+  def fetchEarliestOffsets(): Map[TopicPartition, Long]
+
+  def fetchLatestOffsets(): Map[TopicPartition, Long]
+
+  def fetchNewPartitionEarliestOffsets(
+newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long]
+}
+
+/**
+ * This class uses Kafka's own [[KafkaConsumer]] API to read data offsets 
from Kafka.
+ *
+ * - The [[ConsumerStrategy]] class defines which Kafka topics and 
partitions should be read
+ *   by this source. These strategies directly correspond to the different 
consumption options
+ *   in . This class is designed to return a configured [[KafkaConsumer]] 
that is used by the
+ *   [[KafkaSource]] to query for the offsets. See the docs on
+ *   [[org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy]] 
for more details.
+ */
+private[kafka010] class KafkaOffsetReaderImpl(
+consumerStrategy: ConsumerStrategy,
+driverKafkaParams: ju.Map[String, Object],
+readerOptions: Map[String, String],
+driverGroupIdPrefix: String)
+  extends KafkaOffsetReader with Logging {
+
+  /**
+   * A KafkaConsumer used in the driver to query the latest Kafka offsets. 
This only queries the
+   * offsets and never commits them.
+   */
+  protected var consumer = createConsumer()
+
+  private val maxOffsetFetchAttempts =
+readerOptions.getOrElse("fetchOffset.numRetries", "3").toInt
+
+  private val offsetFetchAttemptIntervalMs =
+readerOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong
+
+  private var groupId: String = null
+
+  private var nextId = 0
+
+  private def nextGroupId(): String = {
+groupId = driverGroupIdPrefix + "-" + nextId
+nextId += 1
+groupId
+  }
+
+  override def toString(): String = consumerStrategy.toString
+
+  def close(): Unit = consumer.close()
+
+  /**
+   * Set consumer position to specified offsets, making sure all 
assignments are set.
+   */
+  def fetchSpecificStartingOffsets(
+  partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, 
Long] =
+withRetriesWithoutInterrupt {
+  // Poll to get the latest assigned partitions
+  consumer.poll(0)
+  val partitions = consumer.assignment()
+  consumer.pause(partitions)
+  assert(partitions.asScala == partitionOffsets.keySet,
+"If startingOffsets contains specific offsets, you must specify 
all TopicPartitions.\n" +
+  "Use -1 for latest, -2 for earliest, if you don't care.\n" +
+  s"Specified: ${partitionOffsets.keySet} Assigned: 
${partitions.asScala}")
+  logDebug(s"Partitions assigned to consumer: 

[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r97661376
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+
+private[kafka010] class KafkaRelation(
+override val sqlContext: SQLContext,
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+sourceOptions: Map[String, String],
+failOnDataLoss: Boolean,
+startingOffsets: KafkaOffsets,
+endingOffsets: KafkaOffsets)
+  extends BaseRelation with TableScan with Logging {
+
+  require(startingOffsets != LatestOffsets,
+"Starting offset not allowed to be set to latest offsets.")
+  require(endingOffsets != EarliestOffsets,
+"Ending offset not allowed to be set to earliest offsets.")
+
+  private val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sqlContext.sparkContext.conf.getTimeAsMs("spark.network.timeout", 
"120s").toString
+  ).toLong
+
+  override def schema: StructType = KafkaOffsetReader.kafkaSchema
+
+  override def buildScan(): RDD[Row] = {
+// Leverage the KafkaReader to obtain the relevant partition offsets
+val fromPartitionOffsets = getPartitionOffsets(startingOffsets)
+val untilPartitionOffsets = getPartitionOffsets(endingOffsets)
+// Obtain topicPartitions in both from and until partition offset, 
ignoring
+// topic partitions that were added and/or deleted between the two 
above calls.
+val topicPartitions = 
fromPartitionOffsets.keySet.intersect(untilPartitionOffsets.keySet)
+
+
+val sortedExecutors = 
KafkaUtils.getSortedExecutorList(sqlContext.sparkContext)
+val numExecutors = sortedExecutors.length
+logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
+
+// Calculate offset ranges
+val offsetRanges = topicPartitions.map { tp =>
+  val fromOffset = fromPartitionOffsets.get(tp).getOrElse {
+  // This should not happen since topicPartitions contains all 
partitions not in
+  // fromPartitionOffsets
+  throw new IllegalStateException(s"$tp doesn't have a from 
offset")
+  }
+  val untilOffset = untilPartitionOffsets(tp)
+  val preferredLoc = if (numExecutors > 0) {
+// This allows cached KafkaConsumers in the executors to be 
re-used to read the same
+// partition in every batch.
+Some(sortedExecutors(KafkaUtils.floorMod(tp.hashCode, 
numExecutors)))
+  } else None
+  KafkaSourceRDDOffsetRange(tp, fromOffset, untilOffset, preferredLoc)
+}.toArray
+
+logInfo("GetBatch generating RDD of offset range: " +
+  offsetRanges.sortBy(_.topicPartition.toString).mkString(", "))
+
+// Create an RDD that reads from Kafka and get the (key, value) pair 
as byte arrays.
+val rdd = new KafkaSourceRDD(
--- End diff --

I found `df.union(df)` will just union the same RDD which breaks the group 
id assumption. The same CachedKafkaConsumer will be used by two different 
tasks. For batch queries, caching consumers is not necessary. Could you add a 
flag to KafkaSourceRDD to not use the cached consumer? It's better to also 
write a test to cover this case. In addition, this test should one use one 
partition in order to launch two tasks from different RDDs at the same 

[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r97665946
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+
+private[kafka010] class KafkaRelation(
+override val sqlContext: SQLContext,
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+sourceOptions: Map[String, String],
+failOnDataLoss: Boolean,
+startingOffsets: KafkaOffsets,
+endingOffsets: KafkaOffsets)
+  extends BaseRelation with TableScan with Logging {
+
+  require(startingOffsets != LatestOffsets,
+"Starting offset not allowed to be set to latest offsets.")
+  require(endingOffsets != EarliestOffsets,
+"Ending offset not allowed to be set to earliest offsets.")
+
+  private val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sqlContext.sparkContext.conf.getTimeAsMs("spark.network.timeout", 
"120s").toString
+  ).toLong
+
+  override def schema: StructType = KafkaOffsetReader.kafkaSchema
+
+  override def buildScan(): RDD[Row] = {
+// Leverage the KafkaReader to obtain the relevant partition offsets
+val fromPartitionOffsets = getPartitionOffsets(startingOffsets)
+val untilPartitionOffsets = getPartitionOffsets(endingOffsets)
+// Obtain topicPartitions in both from and until partition offset, 
ignoring
+// topic partitions that were added and/or deleted between the two 
above calls.
+val topicPartitions = 
fromPartitionOffsets.keySet.intersect(untilPartitionOffsets.keySet)
--- End diff --

It's better to throw an exception rather than ignoring the deleted 
partitions.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r97628344
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala
 ---
@@ -0,0 +1,376 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+import java.util.concurrent.{Executor, LinkedBlockingQueue}
+
+import scala.collection.JavaConverters._
+import scala.concurrent.{ExecutionContext, Future}
+import scala.concurrent.duration.Duration
+import scala.util.control.NonFatal
+
+import org.apache.kafka.clients.consumer.{Consumer, ConsumerConfig, 
KafkaConsumer}
+import 
org.apache.kafka.clients.consumer.internals.NoOpConsumerRebalanceListener
+import org.apache.kafka.common.TopicPartition
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy
+import org.apache.spark.sql.types._
+import org.apache.spark.util.{ThreadUtils, UninterruptibleThread}
+
+
+private[kafka010] trait KafkaOffsetReader {
+
+  def close()
+
+  def fetchSpecificStartingOffsets(
+partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long]
+
+  def fetchEarliestOffsets(): Map[TopicPartition, Long]
+
+  def fetchLatestOffsets(): Map[TopicPartition, Long]
+
+  def fetchNewPartitionEarliestOffsets(
+newPartitions: Seq[TopicPartition]): Map[TopicPartition, Long]
+}
+
+/**
+ * This class uses Kafka's own [[KafkaConsumer]] API to read data offsets 
from Kafka.
+ *
+ * - The [[ConsumerStrategy]] class defines which Kafka topics and 
partitions should be read
+ *   by this source. These strategies directly correspond to the different 
consumption options
+ *   in . This class is designed to return a configured [[KafkaConsumer]] 
that is used by the
+ *   [[KafkaSource]] to query for the offsets. See the docs on
+ *   [[org.apache.spark.sql.kafka010.KafkaOffsetReader.ConsumerStrategy]] 
for more details.
+ */
+private[kafka010] class KafkaOffsetReaderImpl(
+consumerStrategy: ConsumerStrategy,
+driverKafkaParams: ju.Map[String, Object],
+readerOptions: Map[String, String],
+driverGroupIdPrefix: String)
+  extends KafkaOffsetReader with Logging {
+
+  /**
+   * A KafkaConsumer used in the driver to query the latest Kafka offsets. 
This only queries the
+   * offsets and never commits them.
+   */
+  protected var consumer = createConsumer()
+
+  private val maxOffsetFetchAttempts =
+readerOptions.getOrElse("fetchOffset.numRetries", "3").toInt
+
+  private val offsetFetchAttemptIntervalMs =
+readerOptions.getOrElse("fetchOffset.retryIntervalMs", "1000").toLong
+
+  private var groupId: String = null
+
+  private var nextId = 0
+
+  private def nextGroupId(): String = {
+groupId = driverGroupIdPrefix + "-" + nextId
+nextId += 1
+groupId
+  }
+
+  override def toString(): String = consumerStrategy.toString
+
+  def close(): Unit = consumer.close()
+
+  /**
+   * Set consumer position to specified offsets, making sure all 
assignments are set.
+   */
+  def fetchSpecificStartingOffsets(
+  partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, 
Long] =
+withRetriesWithoutInterrupt {
+  // Poll to get the latest assigned partitions
+  consumer.poll(0)
+  val partitions = consumer.assignment()
+  consumer.pause(partitions)
+  assert(partitions.asScala == partitionOffsets.keySet,
+"If startingOffsets contains specific offsets, you must specify 
all TopicPartitions.\n" +
+  "Use -1 for latest, -2 for earliest, if you don't care.\n" +
+  s"Specified: ${partitionOffsets.keySet} Assigned: 
${partitions.asScala}")
+  logDebug(s"Partitions assigned to consumer: 

[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r97629645
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaProvider.scala
 ---
@@ -28,19 +28,27 @@ import 
org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SQLContext
 import org.apache.spark.sql.execution.streaming.Source
-import org.apache.spark.sql.kafka010.KafkaSource._
-import org.apache.spark.sql.sources.{DataSourceRegister, 
StreamSourceProvider}
+import org.apache.spark.sql.kafka010.KafkaOffsetReader.{AssignStrategy, 
SubscribePatternStrategy, SubscribeStrategy}
+import org.apache.spark.sql.sources._
 import org.apache.spark.sql.types.StructType
 
 /**
  * The provider class for the [[KafkaSource]]. This provider is designed 
such that it throws
  * IllegalArgumentException when the Kafka Dataset is created, so that it 
can catch
  * missing options even before the query is started.
  */
-private[kafka010] class KafkaSourceProvider extends StreamSourceProvider
-  with DataSourceRegister with Logging {
+private[kafka010] class KafkaProvider extends DataSourceRegister with 
StreamSourceProvider
+  with RelationProvider with Logging {
+  import KafkaProvider._
 
-  import KafkaSourceProvider._
+  // Used to check parameters for different source modes
+  private sealed trait Mode
--- End diff --

nit: could you move these classes and `deserClassName` to `object 
KafkaProvider`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r97657985
  
--- Diff: 
external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaRelationSuite.scala
 ---
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.util.concurrent.atomic.AtomicInteger
+
+import org.apache.kafka.common.TopicPartition
+import org.scalatest.BeforeAndAfter
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.test.SharedSQLContext
+
+class KafkaRelationSuite extends SparkFunSuite with BeforeAndAfter with 
SharedSQLContext {
+
+  import testImplicits._
+
+  private val topicId = new AtomicInteger(0)
+
+  private var testUtils: KafkaTestUtils = _
+
+  private def newTopic(): String = s"topic-${topicId.getAndIncrement()}"
+
+  private def assignString(topic: String, partitions: Iterable[Int]): 
String = {
+JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p)))
+  }
+
+  override def beforeAll(): Unit = {
+super.beforeAll()
+testUtils = new KafkaTestUtils
+testUtils.setup()
+  }
+
+  override def afterAll(): Unit = {
+if (testUtils != null) {
+  testUtils.teardown()
+  testUtils = null
+  super.afterAll()
+}
+  }
+
+  test("batch processing earliest to latest") {
+val topic = newTopic()
+testUtils.createTopic(topic, partitions = 3)
+testUtils.sendMessages(topic, (0 to 9).map(_.toString).toArray, 
Some(0))
+testUtils.sendMessages(topic, (10 to 19).map(_.toString).toArray, 
Some(1))
+testUtils.sendMessages(topic, Array("20"), Some(2))
+
+// Specify explicit earliest and latest offset values
+var reader = spark
+  .read
+  .format("kafka")
+  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
+  .option("subscribe", topic)
+  .option("startingOffsets", "earliest")
+  .option("endingOffsets", "latest")
+  .load()
+assert(reader.count() === 21)
--- End diff --

You can extend QueryTest rather than SparkFunSuite to use `checkAnswer` 
like this:
```Scala
var df = spark
  .read
  .format("kafka")
  .option("kafka.bootstrap.servers", testUtils.brokerAddress)
  .option("subscribe", topic)
  .option("startingOffsets", "earliest")
  .option("endingOffsets", "latest")
  .load()
  .selectExpr("CAST(value AS STRING)")

checkAnswer(df, (0 to 20).map(_.toString).toDF)

```


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r97664854
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+
+private[kafka010] class KafkaRelation(
+override val sqlContext: SQLContext,
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+sourceOptions: Map[String, String],
+failOnDataLoss: Boolean,
+startingOffsets: KafkaOffsets,
+endingOffsets: KafkaOffsets)
+  extends BaseRelation with TableScan with Logging {
+
+  require(startingOffsets != LatestOffsets,
+"Starting offset not allowed to be set to latest offsets.")
+  require(endingOffsets != EarliestOffsets,
+"Ending offset not allowed to be set to earliest offsets.")
+
+  private val pollTimeoutMs = sourceOptions.getOrElse(
+"kafkaConsumer.pollTimeoutMs",
+sqlContext.sparkContext.conf.getTimeAsMs("spark.network.timeout", 
"120s").toString
+  ).toLong
+
+  override def schema: StructType = KafkaOffsetReader.kafkaSchema
+
+  override def buildScan(): RDD[Row] = {
+// Leverage the KafkaReader to obtain the relevant partition offsets
+val fromPartitionOffsets = getPartitionOffsets(startingOffsets)
+val untilPartitionOffsets = getPartitionOffsets(endingOffsets)
+// Obtain topicPartitions in both from and until partition offset, 
ignoring
+// topic partitions that were added and/or deleted between the two 
above calls.
+val topicPartitions = 
fromPartitionOffsets.keySet.intersect(untilPartitionOffsets.keySet)
+
+
+val sortedExecutors = 
KafkaUtils.getSortedExecutorList(sqlContext.sparkContext)
+val numExecutors = sortedExecutors.length
+logDebug("Sorted executors: " + sortedExecutors.mkString(", "))
+
+// Calculate offset ranges
+val offsetRanges = topicPartitions.map { tp =>
+  val fromOffset = fromPartitionOffsets.get(tp).getOrElse {
+  // This should not happen since topicPartitions contains all 
partitions not in
+  // fromPartitionOffsets
+  throw new IllegalStateException(s"$tp doesn't have a from 
offset")
+  }
+  val untilOffset = untilPartitionOffsets(tp)
+  val preferredLoc = if (numExecutors > 0) {
+// This allows cached KafkaConsumers in the executors to be 
re-used to read the same
+// partition in every batch.
+Some(sortedExecutors(KafkaUtils.floorMod(tp.hashCode, 
numExecutors)))
--- End diff --

You don't need to set the preferred locations after changing to not use the 
cached consumers.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r97666273
  
--- Diff: 
external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaRelation.scala
 ---
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.kafka010
+
+import java.{util => ju}
+
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.{Row, SQLContext}
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.sources.{BaseRelation, TableScan}
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.unsafe.types.UTF8String
+
+
+private[kafka010] class KafkaRelation(
+override val sqlContext: SQLContext,
+kafkaReader: KafkaOffsetReader,
+executorKafkaParams: ju.Map[String, Object],
+sourceOptions: Map[String, String],
+failOnDataLoss: Boolean,
+startingOffsets: KafkaOffsets,
+endingOffsets: KafkaOffsets)
+  extends BaseRelation with TableScan with Logging {
+
+  require(startingOffsets != LatestOffsets,
--- End diff --

nit: changed it to `assert` since the parameters have already been 
validated.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-24 Thread zsxwing
Github user zsxwing commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r97637198
  
--- Diff: 
external/kafka-0-10-sql/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 ---
@@ -1 +1 @@
-org.apache.spark.sql.kafka010.KafkaSourceProvider
+org.apache.spark.sql.kafka010.KafkaProvider
--- End diff --

The cost of keeping the class name is pretty low. Just discussed with 
@marmbrus @tdas offline and we agreed to not change the name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-24 Thread tcondie
Github user tcondie commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r97620911
  
--- Diff: 
external/kafka-0-10-sql/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 ---
@@ -1 +1 @@
-org.apache.spark.sql.kafka010.KafkaSourceProvider
+org.apache.spark.sql.kafka010.KafkaProvider
--- End diff --

That's true, but revised Provider not only provides a Source but also a 
Relation, hence the decision to rename to something more general. Not clear if 
this outweighs the risks you've pointed out. @tdas @zsxwing 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-23 Thread HyukjinKwon
Github user HyukjinKwon commented on a diff in the pull request:

https://github.com/apache/spark/pull/16686#discussion_r97462264
  
--- Diff: 
external/kafka-0-10-sql/src/main/resources/META-INF/services/org.apache.spark.sql.sources.DataSourceRegister
 ---
@@ -1 +1 @@
-org.apache.spark.sql.kafka010.KafkaSourceProvider
+org.apache.spark.sql.kafka010.KafkaProvider
--- End diff --

Hi @tcondie, I just happened to look at this PR. I just wonder if this 
breaks existing codes that use 
`.format("org.apache.spark.sql.kafka010.KafkaSourceProvider")` although almost 
no users use this by that name.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org



[GitHub] spark pull request #16686: [SPARK-18682][SS] Batch Source for Kafka

2017-01-23 Thread tcondie
GitHub user tcondie opened a pull request:

https://github.com/apache/spark/pull/16686

[SPARK-18682][SS] Batch Source for Kafka

## What changes were proposed in this pull request?

Today, you can start a stream that reads from kafka. However, given kafka's 
configurable retention period, it seems like sometimes you might just want to 
read all of the data that is available now. As such we should add a version 
that works with spark.read as well.
The options should be the same as the streaming kafka source, with the 
following differences:
startingOffsets should default to earliest, and should not allow latest 
(which would always be empty).
endingOffsets should also be allowed and should default to latest. the same 
assign json format as startingOffsets should also be accepted.
It would be really good, if things like .limit(n) were enough to prevent 
all the data from being read (this might just work).

## How was this patch tested?

KafkaRelationSuite was added for testing batch queries via KafkaUtils. 


You can merge this pull request into a Git repository by running:

$ git pull https://github.com/tcondie/spark SPARK-18682

Alternatively you can review and apply these changes as the patch at:

https://github.com/apache/spark/pull/16686.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

This closes #16686






---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

-
To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org
For additional commands, e-mail: reviews-h...@spark.apache.org