[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user asfgit closed the pull request at: https://github.com/apache/spark/pull/20096 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user zsxwing commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160774506 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala --- @@ -39,6 +39,15 @@ private[continuous] sealed trait EpochCoordinatorMessage extends Serializable */ private[sql] case object IncrementAndGetEpoch extends EpochCoordinatorMessage +/** --- End diff -- looks good to me --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160640668 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/continuous/EpochCoordinator.scala --- @@ -39,6 +39,15 @@ private[continuous] sealed trait EpochCoordinatorMessage extends Serializable */ private[sql] case object IncrementAndGetEpoch extends EpochCoordinatorMessage +/** --- End diff -- @zsxwing Can you take a look at these changes in this file. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160570055 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -49,28 +52,37 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { override val streamingTimeout = 30.seconds + protected val brokerProps = Map[String, Object]() + override def beforeAll(): Unit = { super.beforeAll() -testUtils = new KafkaTestUtils +testUtils = new KafkaTestUtils(brokerProps) testUtils.setup() } override def afterAll(): Unit = { if (testUtils != null) { testUtils.teardown() testUtils = null - super.afterAll() } +super.afterAll() } protected def makeSureGetOffsetCalled = AssertOnQuery { q => // Because KafkaSource's initialPartitionOffsets is set lazily, we need to make sure -// its "getOffset" is called before pushing any data. Otherwise, because of the race contion, +// its "getOffset" is called before pushing any data. Otherwise, because of the race contOOion, --- End diff -- I remember wondering this morning why my command-O key sequence wasn't working... I guess this is where it went. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160569877 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -237,85 +378,67 @@ class KafkaSourceSuite extends KafkaSourceTest { } } - test("(de)serialization of initial offsets") { + test("KafkaSource with watermark") { +val now = System.currentTimeMillis() val topic = newTopic() -testUtils.createTopic(topic, partitions = 64) +testUtils.createTopic(newTopic(), partitions = 1) +testUtils.sendMessages(topic, Array(1).map(_.toString)) -val reader = spark +val kafka = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("startingOffsets", s"earliest") .option("subscribe", topic) + .load() -testStream(reader.load)( - makeSureGetOffsetCalled, - StopStream, - StartStream(), - StopStream) +val windowedAggregation = kafka + .withWatermark("timestamp", "10 seconds") + .groupBy(window($"timestamp", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"window".getField("start") as 'window, $"count") + +val query = windowedAggregation + .writeStream + .format("memory") + .outputMode("complete") + .queryName("kafkaWatermark") + .start() +query.processAllAvailable() +val rows = spark.table("kafkaWatermark").collect() +assert(rows.length === 1, s"Unexpected results: ${rows.toList}") +val row = rows(0) +// We cannot check the exact window start time as it depands on the time that messages were +// inserted by the producer. So here we just use a low bound to make sure the internal +// conversion works. +assert( + row.getAs[java.sql.Timestamp]("window").getTime >= now - 5 * 1000, + s"Unexpected results: $row") +assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row") +query.stop() } +} - test("maxOffsetsPerTrigger") { +class KafkaSourceSuiteBase extends KafkaSourceTest { + + import testImplicits._ + + test("(de)serialization of initial offsets") { val topic = newTopic() -testUtils.createTopic(topic, partitions = 3) -testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, Some(0)) -testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, Some(1)) -testUtils.sendMessages(topic, Array("1"), Some(2)) +testUtils.createTopic(topic, partitions = 5) val reader = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("maxOffsetsPerTrigger", 10) .option("subscribe", topic) - .option("startingOffsets", "earliest") -val kafka = reader.load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] -val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) - -val clock = new StreamManualClock - -val waitUntilBatchProcessed = AssertOnQuery { q => - eventually(Timeout(streamingTimeout)) { -if (!q.exception.isDefined) { - assert(clock.isStreamWaitingAt(clock.getTimeMillis())) -} - } - if (q.exception.isDefined) { -throw q.exception.get - } - true -} -testStream(mapped)( - StartStream(ProcessingTime(100), clock), - waitUntilBatchProcessed, - // 1 from smallest, 1 from middle, 8 from biggest - CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107), - AdvanceManualClock(100), - waitUntilBatchProcessed, - // smallest now empty, 1 more from middle, 9 more from biggest - CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, -11, 108, 109, 110, 111, 112, 113, 114, 115, 116 - ), +testStream(reader.load)( + makeSureGetOffsetCalled, StopStream, - StartStream(ProcessingTime(100), clock), - waitUntilBatchProcessed, - // smallest now empty, 1 more from middle, 9 more from biggest - CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, -11, 108, 109, 110, 111, 112, 113, 114, 115, 116, -12, 117, 118, 119, 120, 121, 122, 123, 124, 125 - ), - AdvanceManualClock(100), - waitUntilBatchProcessed, - //
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160569556 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -977,20 +971,8 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared } } - test("stress test for failOnDataLoss=false") { -val reader = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("subscribePattern", "failOnDataLoss.*") - .option("startingOffsets", "earliest") - .option("failOnDataLoss", "false") - .option("fetchOffset.retryIntervalMs", "3000") -val kafka = reader.load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] -val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new ForeachWriter[Int] { + protected def startStream(ds: Dataset[Int]) = { --- End diff -- startStream is overridden in the continuous version of this test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160552554 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSuite.scala --- @@ -0,0 +1,135 @@ +/* --- End diff -- Rename this file to KafkaContinuousSourceSuite --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160550486 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -237,85 +378,67 @@ class KafkaSourceSuite extends KafkaSourceTest { } } - test("(de)serialization of initial offsets") { + test("KafkaSource with watermark") { +val now = System.currentTimeMillis() val topic = newTopic() -testUtils.createTopic(topic, partitions = 64) +testUtils.createTopic(newTopic(), partitions = 1) +testUtils.sendMessages(topic, Array(1).map(_.toString)) -val reader = spark +val kafka = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("startingOffsets", s"earliest") .option("subscribe", topic) + .load() -testStream(reader.load)( - makeSureGetOffsetCalled, - StopStream, - StartStream(), - StopStream) +val windowedAggregation = kafka + .withWatermark("timestamp", "10 seconds") + .groupBy(window($"timestamp", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"window".getField("start") as 'window, $"count") + +val query = windowedAggregation + .writeStream + .format("memory") + .outputMode("complete") + .queryName("kafkaWatermark") + .start() +query.processAllAvailable() +val rows = spark.table("kafkaWatermark").collect() +assert(rows.length === 1, s"Unexpected results: ${rows.toList}") +val row = rows(0) +// We cannot check the exact window start time as it depands on the time that messages were +// inserted by the producer. So here we just use a low bound to make sure the internal +// conversion works. +assert( + row.getAs[java.sql.Timestamp]("window").getTime >= now - 5 * 1000, + s"Unexpected results: $row") +assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row") +query.stop() } +} - test("maxOffsetsPerTrigger") { +class KafkaSourceSuiteBase extends KafkaSourceTest { + + import testImplicits._ + + test("(de)serialization of initial offsets") { val topic = newTopic() -testUtils.createTopic(topic, partitions = 3) -testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, Some(0)) -testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, Some(1)) -testUtils.sendMessages(topic, Array("1"), Some(2)) +testUtils.createTopic(topic, partitions = 5) val reader = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("maxOffsetsPerTrigger", 10) .option("subscribe", topic) - .option("startingOffsets", "earliest") -val kafka = reader.load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] -val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) - -val clock = new StreamManualClock - -val waitUntilBatchProcessed = AssertOnQuery { q => - eventually(Timeout(streamingTimeout)) { -if (!q.exception.isDefined) { - assert(clock.isStreamWaitingAt(clock.getTimeMillis())) -} - } - if (q.exception.isDefined) { -throw q.exception.get - } - true -} -testStream(mapped)( - StartStream(ProcessingTime(100), clock), - waitUntilBatchProcessed, - // 1 from smallest, 1 from middle, 8 from biggest - CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107), - AdvanceManualClock(100), - waitUntilBatchProcessed, - // smallest now empty, 1 more from middle, 9 more from biggest - CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, -11, 108, 109, 110, 111, 112, 113, 114, 115, 116 - ), +testStream(reader.load)( + makeSureGetOffsetCalled, StopStream, - StartStream(ProcessingTime(100), clock), - waitUntilBatchProcessed, - // smallest now empty, 1 more from middle, 9 more from biggest - CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, -11, 108, 109, 110, 111, 112, 113, 114, 115, 116, -12, 117, 118, 119, 120, 121, 122, 123, 124, 125 - ), - AdvanceManualClock(100), - waitUntilBatchProcessed, - // smallest now
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160546427 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSuite.scala --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.Properties +import java.util.concurrent.atomic.AtomicInteger + +import org.scalatest.time.SpanSugar._ +import scala.collection.mutable +import scala.util.Random + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.streaming.{StreamTest, Trigger} +import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} + +trait KafkaContinuousTest extends KafkaSourceTest { --- End diff -- Add docs to explain what this class if for. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160559942 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala --- @@ -0,0 +1,348 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.Locale +import java.util.concurrent.atomic.AtomicInteger + +import org.apache.kafka.clients.producer.ProducerConfig +import org.apache.kafka.common.serialization.ByteArraySerializer +import org.scalatest.time.SpanSugar._ +import scala.collection.JavaConverters._ + +import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} +import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection} +import org.apache.spark.sql.streaming._ +import org.apache.spark.sql.types.{BinaryType, DataType} +import org.apache.spark.util.Utils + +/** + * This is a temporary port of KafkaSinkSuite, since we do not yet have a V2 memory stream. + * Once we have one, this will be changed to a specialization of KafkaSinkSuite and we won't have + * to duplicate all the code. + */ +class KafkaContinuousSinkSuite extends KafkaContinuousTest { + import testImplicits._ + + override val streamingTimeout = 30.seconds + + override def beforeAll(): Unit = { +super.beforeAll() +testUtils = new KafkaTestUtils( + withBrokerProps = Map("auto.create.topics.enable" -> "false")) +testUtils.setup() + } + + override def afterAll(): Unit = { +if (testUtils != null) { + testUtils.teardown() + testUtils = null +} +super.afterAll() + } + + test("streaming - write to kafka with topic field") { +val inputTopic = newTopic() +testUtils.createTopic(inputTopic, partitions = 1) + +val input = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("subscribe", inputTopic) + .option("startingOffsets", "earliest") + .load() + +val topic = newTopic() +testUtils.createTopic(topic) + +val writer = createKafkaWriter( + input.toDF(), + withTopic = None, + withOutputMode = Some(OutputMode.Append))( + withSelectExpr = s"'$topic' as topic", "value") + +val reader = createKafkaReader(topic) + .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value") + .selectExpr("CAST(key as INT) key", "CAST(value as INT) value") + .as[(Int, Int)] + .map(_._2) + +try { + testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) + failAfter(streamingTimeout) { +writer.processAllAvailable() + } + checkDatasetUnorderly(reader, 1, 2, 3, 4, 5) + testUtils.sendMessages(inputTopic, Array("6", "7", "8", "9", "10")) + failAfter(streamingTimeout) { +writer.processAllAvailable() + } + checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) +} finally { + writer.stop() +} + } + + test("streaming - write data with bad schema") { --- End diff -- missing tests for ."w/o topic field, with topic option" and "topic field and topic option". and also test for the case when topic field is null. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160546593 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSuite.scala --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.Properties +import java.util.concurrent.atomic.AtomicInteger + +import org.scalatest.time.SpanSugar._ +import scala.collection.mutable +import scala.util.Random + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.streaming.{StreamTest, Trigger} +import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} + +trait KafkaContinuousTest extends KafkaSourceTest { + override val defaultTrigger = Trigger.Continuous(1000) + override val defaultUseV2Sink = true + + // We need more than the default local[2] to be able to schedule all partitions simultaneously. + override protected def createSparkSession = new TestSparkSession( +new SparkContext( + "local[10]", + "continuous-stream-test-sql-context", + sparkConf.set("spark.sql.testkey", "true"))) + + // In addition to setting the partitions in Kafka, we have to wait until the query has + // reconfigured to the new count so the test framework can hook in properly. + override protected def setTopicPartitions( + topic: String, newCount: Int, query: StreamExecution) = { +testUtils.addPartitions(topic, newCount) +eventually(timeout(streamingTimeout)) { + assert( +query.lastExecution.logical.collectFirst { + case DataSourceV2Relation(_, r: KafkaContinuousReader) => r +}.exists(_.knownPartitions.size == newCount), +s"query never reconfigured to $newCount partitions") +} + } + + test("ensure continuous stream is being used") { +val query = spark.readStream + .format("rate") + .option("numPartitions", "1") + .option("rowsPerSecond", "1") + .load() + +testStream(query)( + Execute(q => assert(q.isInstanceOf[ContinuousExecution])) +) + } +} + +class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuousTest { --- End diff -- The `{ }` may not be needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160546455 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSuite.scala --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.Properties +import java.util.concurrent.atomic.AtomicInteger + +import org.scalatest.time.SpanSugar._ +import scala.collection.mutable +import scala.util.Random + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.streaming.{StreamTest, Trigger} +import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} + +trait KafkaContinuousTest extends KafkaSourceTest { + override val defaultTrigger = Trigger.Continuous(1000) + override val defaultUseV2Sink = true + + // We need more than the default local[2] to be able to schedule all partitions simultaneously. + override protected def createSparkSession = new TestSparkSession( +new SparkContext( + "local[10]", + "continuous-stream-test-sql-context", + sparkConf.set("spark.sql.testkey", "true"))) + + // In addition to setting the partitions in Kafka, we have to wait until the query has + // reconfigured to the new count so the test framework can hook in properly. + override protected def setTopicPartitions( + topic: String, newCount: Int, query: StreamExecution) = { +testUtils.addPartitions(topic, newCount) +eventually(timeout(streamingTimeout)) { + assert( +query.lastExecution.logical.collectFirst { + case DataSourceV2Relation(_, r: KafkaContinuousReader) => r +}.exists(_.knownPartitions.size == newCount), +s"query never reconfigured to $newCount partitions") +} + } + + test("ensure continuous stream is being used") { +val query = spark.readStream + .format("rate") + .option("numPartitions", "1") + .option("rowsPerSecond", "1") + .load() + +testStream(query)( + Execute(q => assert(q.isInstanceOf[ContinuousExecution])) +) + } +} + +class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuousTest { --- End diff -- Add docs. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160550392 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -237,85 +378,67 @@ class KafkaSourceSuite extends KafkaSourceTest { } } - test("(de)serialization of initial offsets") { + test("KafkaSource with watermark") { +val now = System.currentTimeMillis() val topic = newTopic() -testUtils.createTopic(topic, partitions = 64) +testUtils.createTopic(newTopic(), partitions = 1) +testUtils.sendMessages(topic, Array(1).map(_.toString)) -val reader = spark +val kafka = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("startingOffsets", s"earliest") .option("subscribe", topic) + .load() -testStream(reader.load)( - makeSureGetOffsetCalled, - StopStream, - StartStream(), - StopStream) +val windowedAggregation = kafka + .withWatermark("timestamp", "10 seconds") + .groupBy(window($"timestamp", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"window".getField("start") as 'window, $"count") + +val query = windowedAggregation + .writeStream + .format("memory") + .outputMode("complete") + .queryName("kafkaWatermark") + .start() +query.processAllAvailable() +val rows = spark.table("kafkaWatermark").collect() +assert(rows.length === 1, s"Unexpected results: ${rows.toList}") +val row = rows(0) +// We cannot check the exact window start time as it depands on the time that messages were +// inserted by the producer. So here we just use a low bound to make sure the internal +// conversion works. +assert( + row.getAs[java.sql.Timestamp]("window").getTime >= now - 5 * 1000, + s"Unexpected results: $row") +assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row") +query.stop() } +} - test("maxOffsetsPerTrigger") { +class KafkaSourceSuiteBase extends KafkaSourceTest { + + import testImplicits._ + + test("(de)serialization of initial offsets") { --- End diff -- Is this needed in the common KafkaSourceSuiteBase? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160549136 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -49,28 +52,37 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { override val streamingTimeout = 30.seconds + protected val brokerProps = Map[String, Object]() + override def beforeAll(): Unit = { super.beforeAll() -testUtils = new KafkaTestUtils +testUtils = new KafkaTestUtils(brokerProps) testUtils.setup() } override def afterAll(): Unit = { if (testUtils != null) { testUtils.teardown() testUtils = null - super.afterAll() } +super.afterAll() } protected def makeSureGetOffsetCalled = AssertOnQuery { q => // Because KafkaSource's initialPartitionOffsets is set lazily, we need to make sure -// its "getOffset" is called before pushing any data. Otherwise, because of the race contion, +// its "getOffset" is called before pushing any data. Otherwise, because of the race contOOion, --- End diff -- spelling mistake? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160516280 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSuite.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.Properties +import java.util.concurrent.atomic.AtomicInteger + +import org.scalatest.time.SpanSugar._ +import scala.collection.mutable +import scala.util.Random + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.streaming.{StreamTest, Trigger} +import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} + +trait KafkaContinuousTest extends KafkaSourceTest { + override val defaultTrigger = Trigger.Continuous(1000) + override val defaultUseV2Sink = true + + // We need more than the default local[2] to be able to schedule all partitions simultaneously. + override protected def createSparkSession = new TestSparkSession( +new SparkContext( + "local[10]", + "continuous-stream-test-sql-context", + sparkConf.set("spark.sql.testkey", "true"))) + + override protected def setTopicPartitions( --- End diff -- Add comment on what this method does. It is asserting something, so does not look like it only "sets" something. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160552160 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala --- @@ -977,20 +971,8 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared } } - test("stress test for failOnDataLoss=false") { -val reader = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("subscribePattern", "failOnDataLoss.*") - .option("startingOffsets", "earliest") - .option("failOnDataLoss", "false") - .option("fetchOffset.retryIntervalMs", "3000") -val kafka = reader.load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] -val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new ForeachWriter[Int] { + protected def startStream(ds: Dataset[Int]) = { --- End diff -- i think this factoring is not needed. `startStream()` is not used anywhere else other than in this test. So i dont see a point of refactoring it to define it outside the test. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160552695 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSuite.scala --- @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.Properties +import java.util.concurrent.atomic.AtomicInteger + +import org.scalatest.time.SpanSugar._ +import scala.collection.mutable +import scala.util.Random + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.streaming.{StreamTest, Trigger} +import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} + +trait KafkaContinuousTest extends KafkaSourceTest { --- End diff -- Also since this is used not just by the source, but also the sink, better to define this in a different file. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160516177 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSuite.scala --- @@ -0,0 +1,133 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.Properties +import java.util.concurrent.atomic.AtomicInteger + +import org.scalatest.time.SpanSugar._ +import scala.collection.mutable +import scala.util.Random + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.streaming.StreamExecution +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.streaming.{StreamTest, Trigger} +import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} + +trait KafkaContinuousTest extends KafkaSourceTest { + override val defaultTrigger = Trigger.Continuous(1000) + override val defaultUseV2Sink = true + + // We need more than the default local[2] to be able to schedule all partitions simultaneously. + override protected def createSparkSession = new TestSparkSession( +new SparkContext( + "local[10]", + "continuous-stream-test-sql-context", + sparkConf.set("spark.sql.testkey", "true"))) + + override protected def setTopicPartitions( --- End diff -- Add comment on what this method does. It is asserting something, so does not look like it only "sets" something. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160009573 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala --- @@ -19,7 +19,8 @@ package org.apache.spark.sql.kafka010 import org.apache.kafka.common.TopicPartition -import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset} +import org.apache.spark.sql.execution.streaming.{Offset => OffsetV1, SerializedOffset} --- End diff -- Ummm.. i think its better to rename new Offset to OffsetV2 than this rename old one to OffsetV1. This will keep it more consistent with other APIs which have V2 in them. Also, the MicroBatchExecution in the other PR also uses OffsetV2. Sorry for the nothing think ahead on this. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160004815 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +/** + * A [[ContinuousReader]] for data from kafka. + * + * @param offsetReader a reader used to get kafka offsets. Note that the actual data will be + * read by per-task consumers generated later. + * @param kafkaParams String params for per-task Kafka consumers. + * @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceV2Options]] params which + * are not Kafka consumer params. + * @param metadataPath Path to a directory this reader can use for writing metadata. + * @param initialOffsets The Kafka offsets to start reading data at. + * @param failOnDataLoss Flag indicating whether reading should fail in data loss + * scenarios, where some offsets after the specified initial ones can't be + * properly read. + */ +class KafkaContinuousReader( +offsetReader: KafkaOffsetReader, +kafkaParams: java.util.Map[String, Object], --- End diff -- Since there are lots of different uses of java.util.* ... you can probably rename java.util to ju. Thats what the file KafkaSourceProvider class does. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160007884 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -418,11 +418,16 @@ abstract class StreamExecution( * Blocks the current thread until processing for data from the given `source` has reached at * least the given `Offset`. This method is intended for use primarily when writing tests. */ - private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = { + private[sql] def awaitOffset(sourceIndex: Int, newOffset: Offset): Unit = { assertAwaitThread() def notDone = { val localCommittedOffsets = committedOffsets - !localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset + if (sources.length <= sourceIndex) { +false --- End diff -- The race condition is present because `sources` is initialized to Seq.empty and then assigned to the actual sources. You can actually initialize `sources` to null, and then return `notDone = false` when `sources` is null. Any other mismatch should throw error. I dont like this current code which hides erroneous situations. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160006676 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala --- @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection} +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{kafkaParamsForProducer, TOPIC_OPTION_KEY} +import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.{BinaryType, StringType, StructType} + +/** + * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we + * don't need to really send one. + */ +case object KafkaWriterCommitMessage extends WriterCommitMessage + +/** + * A [[ContinuousWriter]] for Kafka writing. Responsible for generating the writer factory. + * @param topic The topic this writer is responsible for. If None, topic will be inferred from + * a `topic` field in the incoming data. + * @param producerParams Parameters for Kafka producers in each task. + * @param schema The schema of the input data. + */ +class KafkaContinuousWriter( +topic: Option[String], producerParams: Map[String, String], schema: StructType) + extends ContinuousWriter with SupportsWriteInternalRow { + + override def createInternalRowWriterFactory(): KafkaContinuousWriterFactory = +KafkaContinuousWriterFactory(topic, producerParams, schema) + + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(messages: Array[WriterCommitMessage]): Unit = {} +} + +/** + * A [[DataWriterFactory]] for Kafka writing. Will be serialized and sent to executors to generate + * the per-task data writers. + * @param topic The topic that should be written to. If None, topic will be inferred from + * a `topic` field in the incoming data. + * @param producerParams Parameters for Kafka producers in each task. + * @param schema The schema of the input data. + */ +case class KafkaContinuousWriterFactory( +topic: Option[String], producerParams: Map[String, String], schema: StructType) + extends DataWriterFactory[InternalRow] { + + override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = { +new KafkaContinuousDataWriter(topic, producerParams, schema.toAttributes) + } +} + +/** + * A [[DataWriter]] for Kafka writing. One data writer will be created in each partition to + * process incoming rows. + * + * @param targetTopic The topic that this data writer is targeting. If None, topic will be inferred + *from a `topic` field in the incoming data. + * @param producerParams Parameters to use for the Kafka producer. + * @param inputSchema The attributes in the input data. + */ +class KafkaContinuousDataWriter( +targetTopic: Option[String], producerParams: Map[String, String], inputSchema: Seq[Attribute]) + extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] { + import scala.collection.JavaConverters._ + + private lazy val producer = CachedKafkaProducer.getOrCreate( +new java.util.HashMap[String, Object](producerParams.asJava)) + + def write(row: InternalRow): Unit = { +checkForErrors() +sendRow(row, producer) + } + + def commit():
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160005666 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala --- @@ -74,8 +56,49 @@ private[kafka010] class KafkaWriteTask( producer = null } } +} + +private[kafka010] class KafkaRowWriter( --- End diff -- nit: make this a trait .. so that it cannot be directly used. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159985929 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala --- @@ -145,6 +149,19 @@ private[kafka010] class KafkaOffsetReader( } } +partitionOffsets.foreach { + case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && +off != KafkaOffsetRangeLimit.EARLIEST => +if (fetched(tp) != off) { + reportDataLoss( +s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}") +} + case _ => + // no real way to check that beginning or end is reasonable --- End diff -- nit: wrong indent --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r160008897 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSuite.scala --- @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.Properties + +import org.scalatest.time.SpanSugar._ +import scala.collection.mutable +import scala.util.Random + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{ForeachWriter, Row} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingExecutionRelation, StreamingQueryWrapper} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} + +class KafkaContinuousSuite extends KafkaSourceTest with SharedSQLContext { + import testImplicits._ + + // We need more than the default local[2] to be able to schedule all partitions simultaneously. + override protected def createSparkSession = new TestSparkSession( +new SparkContext( + "local[10]", + "continuous-stream-test-sql-context", + sparkConf.set("spark.sql.testkey", "true"))) + + test("basic") { --- End diff -- You are not testing all the cases of starting offsets, and other options. `KafkaSourceSuite` tests them, and since this is a whole different code path that microbatch, all of these case should be tested explicitly. I think the KafkaSourceSuite can be refactored such that code can be reused. Its some amount of work but without it, we are not confident about testing the all the new code paths. Also the same with sink, there are combinations of options that are not tested (default topic specified/not-specified, null value in topic column) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159984700 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala --- @@ -0,0 +1,224 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.catalyst.expressions.UnsafeRow +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +/** + * A [[ContinuousReader]] for data from kafka. + * + * @param offsetReader a reader used to get kafka offsets. Note that the actual data will be + * read by per-task consumers generated later. + * @param kafkaParams String params for per-task Kafka consumers. + * @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceV2Options]] params which + * are not Kafka consumer params. + * @param metadataPath Path to a directory this reader can use for writing metadata. + * @param initialOffsets The Kafka offsets to start reading data at. + * @param failOnDataLoss Flag indicating whether reading should fail in data loss + * scenarios, where some offsets after the specified initial ones can't be + * properly read. + */ +class KafkaContinuousReader( +offsetReader: KafkaOffsetReader, +kafkaParams: java.util.Map[String, Object], +sourceOptions: Map[String, String], +metadataPath: String, +initialOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends ContinuousReader with SupportsScanUnsafeRow with Logging { + + private lazy val session = SparkSession.getActiveSession.get + private lazy val sc = session.sparkContext + + // Initialized when creating read tasks. If this diverges from the partitions at the latest + // offsets, we need to reconfigure. + // Exposed outside this object only for unit tests. + private[sql] var knownPartitions: Set[TopicPartition] = _ + + override def readSchema: StructType = KafkaOffsetReader.kafkaSchema + + private var offset: Offset = _ + override def setOffset(start: java.util.Optional[Offset]): Unit = { +offset = start.orElse { + val offsets = initialOffsets match { +case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets()) +case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets()) +case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss) + } + logInfo(s"Initial offsets: $offsets") + offsets +} + } + + override def getStartOffset(): Offset = offset + + override def deserializeOffset(json: String): Offset = { +KafkaSourceOffset(JsonUtils.partitionOffsets(json)) + } + + override def createUnsafeRowReadTasks(): java.util.List[ReadTask[UnsafeRow]] = { +import scala.collection.JavaConverters._ + +val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(offset) + +val newPartitions = + offsetReader.fetchLatestOffsets().keySet.diff(oldStartPartitionOffsets.keySet) +val newPartitionOffsets =
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159983463 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/CachedKafkaConsumer.scala --- @@ -261,6 +261,10 @@ private[kafka010] case class CachedKafkaConsumer private( } } + def wakeup(): Unit = { --- End diff -- This does not seem to be used anywhere? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159797164 --- Diff: sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamExecution.scala --- @@ -418,11 +418,16 @@ abstract class StreamExecution( * Blocks the current thread until processing for data from the given `source` has reached at * least the given `Offset`. This method is intended for use primarily when writing tests. */ - private[sql] def awaitOffset(source: Source, newOffset: Offset): Unit = { + private[sql] def awaitOffset(sourceIndex: Int, newOffset: Offset): Unit = { assertAwaitThread() def notDone = { val localCommittedOffsets = committedOffsets - !localCommittedOffsets.contains(source) || localCommittedOffsets(source) != newOffset + if (sources.length <= sourceIndex) { +false --- End diff -- Sources is a var which might not be populated yet. (This race condition showed up in AddKafkaData in my tests.) --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159796525 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSinkV2.scala --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection} +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{kafkaParamsForProducer, TOPIC_OPTION_KEY} +import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.{BinaryType, StringType, StructType} + +class ContinuousKafkaWriter( +topic: Option[String], producerParams: Map[String, String], schema: StructType) + extends ContinuousWriter with SupportsWriteInternalRow { + + override def createInternalRowWriterFactory(): KafkaWriterFactory = +KafkaWriterFactory(topic, producerParams, schema) + + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(messages: Array[WriterCommitMessage]): Unit = {} +} + +case class KafkaWriterFactory( +topic: Option[String], producerParams: Map[String, String], schema: StructType) + extends DataWriterFactory[InternalRow] { + + override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = { +new KafkaDataWriter(topic, producerParams, schema.toAttributes) + } +} + +case class KafkaWriterCommitMessage() extends WriterCommitMessage {} + +class KafkaDataWriter( +topic: Option[String], producerParams: Map[String, String], inputSchema: Seq[Attribute]) + extends DataWriter[InternalRow] { + import scala.collection.JavaConverters._ + + @volatile private var failedWrite: Exception = _ + private val projection = createProjection + private lazy val producer = CachedKafkaProducer.getOrCreate( +new java.util.HashMap[String, Object](producerParams.asJava)) + + private val callback = new Callback() { +override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = { + if (failedWrite == null && e != null) { +failedWrite = e + } +} + } + + def write(row: InternalRow): Unit = { +if (failedWrite != null) return + +val projectedRow = projection(row) +val topic = projectedRow.getUTF8String(0) +val key = projectedRow.getBinary(1) +val value = projectedRow.getBinary(2) + +if (topic == null) { + throw new NullPointerException(s"null topic present in the data. Use the " + +s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.") +} +val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value) +producer.send(record, callback) + } + + def commit(): WriterCommitMessage = KafkaWriterCommitMessage() + def abort(): Unit = {} + + def close(): Unit = { +checkForErrors() +if (producer != null) { + producer.flush() + checkForErrors() +} --- End diff -- I think CachedKafkaProducer handles closing automatically, but since these are long lived I can do it explicitly too. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159796099 --- Diff: external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io._ +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.commons.io.IOUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.WakeupException + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class ContinuousKafkaReader( +kafkaReader: KafkaOffsetReader, +executorKafkaParams: java.util.Map[String, Object], +sourceOptions: Map[String, String], +metadataPath: String, +initialOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends ContinuousReader with SupportsScanUnsafeRow with Logging { + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +val mergedMap = offsets.map { + case KafkaSourcePartitionOffset(p, o) => Map(p -> o) +}.reduce(_ ++ _) +KafkaSourceOffset(mergedMap) + } + + private lazy val session = SparkSession.getActiveSession.get + private lazy val sc = session.sparkContext + + private lazy val pollTimeoutMs = sourceOptions.getOrElse( +"kafkaConsumer.pollTimeoutMs", +sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString + ).toLong + + private val maxOffsetsPerTrigger = +sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = { + val offsets = initialOffsets match { +case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets()) +case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets()) +case SpecificOffsetRangeLimit(p) => fetchAndVerify(p) + } + logInfo(s"Initial offsets: $offsets") + offsets.partitionToOffsets + } + + private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = { +val result = kafkaReader.fetchSpecificOffsets(specificOffsets) +specificOffsets.foreach { + case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && +off != KafkaOffsetRangeLimit.EARLIEST => +if (result(tp) != off) { + reportDataLoss( +s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}") +} + case _ => + // no real way to check that beginning or end is reasonable +} +KafkaSourceOffset(result) + } + + // Initialized when creating read tasks. If this diverges from the partitions
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159795887 --- Diff: external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io._ +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.commons.io.IOUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.WakeupException + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class ContinuousKafkaReader( +kafkaReader: KafkaOffsetReader, +executorKafkaParams: java.util.Map[String, Object], +sourceOptions: Map[String, String], +metadataPath: String, +initialOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends ContinuousReader with SupportsScanUnsafeRow with Logging { + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +val mergedMap = offsets.map { + case KafkaSourcePartitionOffset(p, o) => Map(p -> o) +}.reduce(_ ++ _) +KafkaSourceOffset(mergedMap) + } + + private lazy val session = SparkSession.getActiveSession.get + private lazy val sc = session.sparkContext + + private lazy val pollTimeoutMs = sourceOptions.getOrElse( +"kafkaConsumer.pollTimeoutMs", +sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString + ).toLong + + private val maxOffsetsPerTrigger = +sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = { + val offsets = initialOffsets match { +case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets()) +case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets()) +case SpecificOffsetRangeLimit(p) => fetchAndVerify(p) + } + logInfo(s"Initial offsets: $offsets") + offsets.partitionToOffsets + } + + private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = { +val result = kafkaReader.fetchSpecificOffsets(specificOffsets) +specificOffsets.foreach { + case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && +off != KafkaOffsetRangeLimit.EARLIEST => +if (result(tp) != off) { + reportDataLoss( +s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}") +} + case _ => + // no real way to check that beginning or end is reasonable +} +KafkaSourceOffset(result) + } + + // Initialized when creating read tasks. If this diverges from the partitions
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159795747 --- Diff: external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io._ +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.commons.io.IOUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.WakeupException + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class ContinuousKafkaReader( +kafkaReader: KafkaOffsetReader, +executorKafkaParams: java.util.Map[String, Object], +sourceOptions: Map[String, String], +metadataPath: String, +initialOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends ContinuousReader with SupportsScanUnsafeRow with Logging { + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +val mergedMap = offsets.map { + case KafkaSourcePartitionOffset(p, o) => Map(p -> o) +}.reduce(_ ++ _) +KafkaSourceOffset(mergedMap) + } + + private lazy val session = SparkSession.getActiveSession.get + private lazy val sc = session.sparkContext + + private lazy val pollTimeoutMs = sourceOptions.getOrElse( +"kafkaConsumer.pollTimeoutMs", +sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString + ).toLong + + private val maxOffsetsPerTrigger = +sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = { + val offsets = initialOffsets match { +case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets()) +case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets()) +case SpecificOffsetRangeLimit(p) => fetchAndVerify(p) + } + logInfo(s"Initial offsets: $offsets") + offsets.partitionToOffsets + } + + private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = { +val result = kafkaReader.fetchSpecificOffsets(specificOffsets) +specificOffsets.foreach { + case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && +off != KafkaOffsetRangeLimit.EARLIEST => +if (result(tp) != off) { + reportDataLoss( +s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}") +} + case _ => + // no real way to check that beginning or end is reasonable +} +KafkaSourceOffset(result) + } + + // Initialized when creating read tasks. If this diverges from the partitions
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user jose-torres commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159795735 --- Diff: external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io._ +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.commons.io.IOUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.WakeupException + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class ContinuousKafkaReader( +kafkaReader: KafkaOffsetReader, +executorKafkaParams: java.util.Map[String, Object], +sourceOptions: Map[String, String], +metadataPath: String, +initialOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends ContinuousReader with SupportsScanUnsafeRow with Logging { + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +val mergedMap = offsets.map { + case KafkaSourcePartitionOffset(p, o) => Map(p -> o) +}.reduce(_ ++ _) +KafkaSourceOffset(mergedMap) + } + + private lazy val session = SparkSession.getActiveSession.get + private lazy val sc = session.sparkContext + + private lazy val pollTimeoutMs = sourceOptions.getOrElse( +"kafkaConsumer.pollTimeoutMs", +sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString + ).toLong + + private val maxOffsetsPerTrigger = +sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = { --- End diff -- oops, no --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159569390 --- Diff: external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io._ +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.commons.io.IOUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.WakeupException + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class ContinuousKafkaReader( +kafkaReader: KafkaOffsetReader, +executorKafkaParams: java.util.Map[String, Object], +sourceOptions: Map[String, String], +metadataPath: String, +initialOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends ContinuousReader with SupportsScanUnsafeRow with Logging { + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +val mergedMap = offsets.map { + case KafkaSourcePartitionOffset(p, o) => Map(p -> o) +}.reduce(_ ++ _) +KafkaSourceOffset(mergedMap) + } + + private lazy val session = SparkSession.getActiveSession.get + private lazy val sc = session.sparkContext + + private lazy val pollTimeoutMs = sourceOptions.getOrElse( +"kafkaConsumer.pollTimeoutMs", +sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString + ).toLong + + private val maxOffsetsPerTrigger = +sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = { + val offsets = initialOffsets match { +case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets()) +case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets()) +case SpecificOffsetRangeLimit(p) => fetchAndVerify(p) + } + logInfo(s"Initial offsets: $offsets") + offsets.partitionToOffsets + } + + private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = { +val result = kafkaReader.fetchSpecificOffsets(specificOffsets) +specificOffsets.foreach { + case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && +off != KafkaOffsetRangeLimit.EARLIEST => +if (result(tp) != off) { + reportDataLoss( +s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}") +} + case _ => + // no real way to check that beginning or end is reasonable +} +KafkaSourceOffset(result) + } + + // Initialized when creating read tasks. If this diverges from the partitions at the
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159569305 --- Diff: external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io._ +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.commons.io.IOUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.WakeupException + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class ContinuousKafkaReader( +kafkaReader: KafkaOffsetReader, +executorKafkaParams: java.util.Map[String, Object], +sourceOptions: Map[String, String], +metadataPath: String, +initialOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends ContinuousReader with SupportsScanUnsafeRow with Logging { + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +val mergedMap = offsets.map { + case KafkaSourcePartitionOffset(p, o) => Map(p -> o) +}.reduce(_ ++ _) +KafkaSourceOffset(mergedMap) + } + + private lazy val session = SparkSession.getActiveSession.get + private lazy val sc = session.sparkContext + + private lazy val pollTimeoutMs = sourceOptions.getOrElse( +"kafkaConsumer.pollTimeoutMs", +sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString + ).toLong + + private val maxOffsetsPerTrigger = +sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = { + val offsets = initialOffsets match { +case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets()) +case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets()) +case SpecificOffsetRangeLimit(p) => fetchAndVerify(p) + } + logInfo(s"Initial offsets: $offsets") + offsets.partitionToOffsets + } + + private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = { +val result = kafkaReader.fetchSpecificOffsets(specificOffsets) +specificOffsets.foreach { + case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && +off != KafkaOffsetRangeLimit.EARLIEST => +if (result(tp) != off) { + reportDataLoss( +s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}") +} + case _ => + // no real way to check that beginning or end is reasonable +} +KafkaSourceOffset(result) + } + + // Initialized when creating read tasks. If this diverges from the partitions at the
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159569599 --- Diff: external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io._ +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.commons.io.IOUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.WakeupException + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class ContinuousKafkaReader( +kafkaReader: KafkaOffsetReader, +executorKafkaParams: java.util.Map[String, Object], +sourceOptions: Map[String, String], +metadataPath: String, +initialOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends ContinuousReader with SupportsScanUnsafeRow with Logging { + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +val mergedMap = offsets.map { + case KafkaSourcePartitionOffset(p, o) => Map(p -> o) +}.reduce(_ ++ _) +KafkaSourceOffset(mergedMap) + } + + private lazy val session = SparkSession.getActiveSession.get + private lazy val sc = session.sparkContext + + private lazy val pollTimeoutMs = sourceOptions.getOrElse( +"kafkaConsumer.pollTimeoutMs", +sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString + ).toLong + + private val maxOffsetsPerTrigger = +sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = { + val offsets = initialOffsets match { +case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets()) +case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets()) +case SpecificOffsetRangeLimit(p) => fetchAndVerify(p) + } + logInfo(s"Initial offsets: $offsets") + offsets.partitionToOffsets + } + + private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = { +val result = kafkaReader.fetchSpecificOffsets(specificOffsets) +specificOffsets.foreach { + case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && +off != KafkaOffsetRangeLimit.EARLIEST => +if (result(tp) != off) { + reportDataLoss( +s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}") +} + case _ => + // no real way to check that beginning or end is reasonable +} +KafkaSourceOffset(result) + } + + // Initialized when creating read tasks. If this diverges from the partitions at the
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159568911 --- Diff: external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io._ +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.commons.io.IOUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.WakeupException + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class ContinuousKafkaReader( +kafkaReader: KafkaOffsetReader, +executorKafkaParams: java.util.Map[String, Object], +sourceOptions: Map[String, String], +metadataPath: String, +initialOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends ContinuousReader with SupportsScanUnsafeRow with Logging { + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +val mergedMap = offsets.map { + case KafkaSourcePartitionOffset(p, o) => Map(p -> o) +}.reduce(_ ++ _) +KafkaSourceOffset(mergedMap) + } + + private lazy val session = SparkSession.getActiveSession.get + private lazy val sc = session.sparkContext + + private lazy val pollTimeoutMs = sourceOptions.getOrElse( +"kafkaConsumer.pollTimeoutMs", +sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString + ).toLong + + private val maxOffsetsPerTrigger = +sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = { + val offsets = initialOffsets match { +case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets()) +case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets()) +case SpecificOffsetRangeLimit(p) => fetchAndVerify(p) + } + logInfo(s"Initial offsets: $offsets") + offsets.partitionToOffsets + } + + private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = { +val result = kafkaReader.fetchSpecificOffsets(specificOffsets) +specificOffsets.foreach { + case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && +off != KafkaOffsetRangeLimit.EARLIEST => +if (result(tp) != off) { + reportDataLoss( +s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}") +} + case _ => + // no real way to check that beginning or end is reasonable +} +KafkaSourceOffset(result) + } + + // Initialized when creating read tasks. If this diverges from the partitions at the
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159728249 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSinkV2.scala --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection} +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{kafkaParamsForProducer, TOPIC_OPTION_KEY} +import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.{BinaryType, StringType, StructType} + +class ContinuousKafkaWriter( +topic: Option[String], producerParams: Map[String, String], schema: StructType) + extends ContinuousWriter with SupportsWriteInternalRow { + + override def createInternalRowWriterFactory(): KafkaWriterFactory = +KafkaWriterFactory(topic, producerParams, schema) + + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(messages: Array[WriterCommitMessage]): Unit = {} +} + +case class KafkaWriterFactory( +topic: Option[String], producerParams: Map[String, String], schema: StructType) + extends DataWriterFactory[InternalRow] { + + override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = { +new KafkaDataWriter(topic, producerParams, schema.toAttributes) + } +} + +case class KafkaWriterCommitMessage() extends WriterCommitMessage {} --- End diff -- If there is no param, then this can be a `case object`. And I dont think `{}` is needed. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159568750 --- Diff: external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io._ +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.commons.io.IOUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.WakeupException + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class ContinuousKafkaReader( +kafkaReader: KafkaOffsetReader, +executorKafkaParams: java.util.Map[String, Object], +sourceOptions: Map[String, String], +metadataPath: String, +initialOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends ContinuousReader with SupportsScanUnsafeRow with Logging { + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +val mergedMap = offsets.map { + case KafkaSourcePartitionOffset(p, o) => Map(p -> o) +}.reduce(_ ++ _) +KafkaSourceOffset(mergedMap) + } + + private lazy val session = SparkSession.getActiveSession.get + private lazy val sc = session.sparkContext + + private lazy val pollTimeoutMs = sourceOptions.getOrElse( +"kafkaConsumer.pollTimeoutMs", +sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString + ).toLong + + private val maxOffsetsPerTrigger = +sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = { + val offsets = initialOffsets match { +case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets()) +case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets()) +case SpecificOffsetRangeLimit(p) => fetchAndVerify(p) + } + logInfo(s"Initial offsets: $offsets") + offsets.partitionToOffsets + } + + private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = { +val result = kafkaReader.fetchSpecificOffsets(specificOffsets) +specificOffsets.foreach { + case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && +off != KafkaOffsetRangeLimit.EARLIEST => +if (result(tp) != off) { + reportDataLoss( +s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}") +} + case _ => + // no real way to check that beginning or end is reasonable +} +KafkaSourceOffset(result) + } + + // Initialized when creating read tasks. If this diverges from the partitions at the
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159568798 --- Diff: external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io._ +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.commons.io.IOUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.WakeupException + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class ContinuousKafkaReader( +kafkaReader: KafkaOffsetReader, +executorKafkaParams: java.util.Map[String, Object], +sourceOptions: Map[String, String], +metadataPath: String, +initialOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends ContinuousReader with SupportsScanUnsafeRow with Logging { + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +val mergedMap = offsets.map { + case KafkaSourcePartitionOffset(p, o) => Map(p -> o) +}.reduce(_ ++ _) +KafkaSourceOffset(mergedMap) + } + + private lazy val session = SparkSession.getActiveSession.get + private lazy val sc = session.sparkContext + + private lazy val pollTimeoutMs = sourceOptions.getOrElse( +"kafkaConsumer.pollTimeoutMs", +sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString + ).toLong + + private val maxOffsetsPerTrigger = +sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = { + val offsets = initialOffsets match { +case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets()) +case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets()) +case SpecificOffsetRangeLimit(p) => fetchAndVerify(p) + } + logInfo(s"Initial offsets: $offsets") + offsets.partitionToOffsets + } + + private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = { +val result = kafkaReader.fetchSpecificOffsets(specificOffsets) +specificOffsets.foreach { + case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && +off != KafkaOffsetRangeLimit.EARLIEST => +if (result(tp) != off) { + reportDataLoss( +s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}") +} + case _ => + // no real way to check that beginning or end is reasonable +} +KafkaSourceOffset(result) + } + + // Initialized when creating read tasks. If this diverges from the partitions at the
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159569925 --- Diff: external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io._ +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.commons.io.IOUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.WakeupException + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class ContinuousKafkaReader( +kafkaReader: KafkaOffsetReader, +executorKafkaParams: java.util.Map[String, Object], +sourceOptions: Map[String, String], +metadataPath: String, +initialOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends ContinuousReader with SupportsScanUnsafeRow with Logging { + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +val mergedMap = offsets.map { + case KafkaSourcePartitionOffset(p, o) => Map(p -> o) +}.reduce(_ ++ _) +KafkaSourceOffset(mergedMap) + } + + private lazy val session = SparkSession.getActiveSession.get + private lazy val sc = session.sparkContext + + private lazy val pollTimeoutMs = sourceOptions.getOrElse( +"kafkaConsumer.pollTimeoutMs", +sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString + ).toLong + + private val maxOffsetsPerTrigger = +sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = { + val offsets = initialOffsets match { +case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets()) +case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets()) +case SpecificOffsetRangeLimit(p) => fetchAndVerify(p) + } + logInfo(s"Initial offsets: $offsets") + offsets.partitionToOffsets + } + + private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = { +val result = kafkaReader.fetchSpecificOffsets(specificOffsets) +specificOffsets.foreach { + case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && +off != KafkaOffsetRangeLimit.EARLIEST => +if (result(tp) != off) { + reportDataLoss( +s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}") +} + case _ => + // no real way to check that beginning or end is reasonable +} +KafkaSourceOffset(result) + } + + // Initialized when creating read tasks. If this diverges from the partitions at the
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159571068 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSinkV2.scala --- @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata} + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{Row, SparkSession} +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection} +import org.apache.spark.sql.kafka010.KafkaSourceProvider.{kafkaParamsForProducer, TOPIC_OPTION_KEY} +import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter +import org.apache.spark.sql.sources.v2.writer._ +import org.apache.spark.sql.streaming.OutputMode +import org.apache.spark.sql.types.{BinaryType, StringType, StructType} + +class ContinuousKafkaWriter( +topic: Option[String], producerParams: Map[String, String], schema: StructType) + extends ContinuousWriter with SupportsWriteInternalRow { + + override def createInternalRowWriterFactory(): KafkaWriterFactory = +KafkaWriterFactory(topic, producerParams, schema) + + override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} + override def abort(messages: Array[WriterCommitMessage]): Unit = {} +} + +case class KafkaWriterFactory( --- End diff -- I know that there are docs in the DataSourceV2 API classes, but can you add brief docs to these classes to make it easier to understand how these class relate to each other and how they are used? Otherwise jumping back and forth to the superclases to read the docs is tricky. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159566309 --- Diff: external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io._ +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.commons.io.IOUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.WakeupException + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class ContinuousKafkaReader( +kafkaReader: KafkaOffsetReader, +executorKafkaParams: java.util.Map[String, Object], +sourceOptions: Map[String, String], +metadataPath: String, +initialOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends ContinuousReader with SupportsScanUnsafeRow with Logging { + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +val mergedMap = offsets.map { + case KafkaSourcePartitionOffset(p, o) => Map(p -> o) +}.reduce(_ ++ _) +KafkaSourceOffset(mergedMap) + } + + private lazy val session = SparkSession.getActiveSession.get + private lazy val sc = session.sparkContext + + private lazy val pollTimeoutMs = sourceOptions.getOrElse( +"kafkaConsumer.pollTimeoutMs", +sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString + ).toLong + + private val maxOffsetsPerTrigger = +sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = { + val offsets = initialOffsets match { +case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets()) +case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets()) +case SpecificOffsetRangeLimit(p) => fetchAndVerify(p) + } + logInfo(s"Initial offsets: $offsets") + offsets.partitionToOffsets + } + + private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = { --- End diff -- This functions seems to be duplicate of that in the KafkaSource. Can you dedup? Maybe move this into the KafkaOffsetReader? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159570879 --- Diff: external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io._ +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.commons.io.IOUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.WakeupException + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class ContinuousKafkaReader( +kafkaReader: KafkaOffsetReader, +executorKafkaParams: java.util.Map[String, Object], +sourceOptions: Map[String, String], +metadataPath: String, +initialOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends ContinuousReader with SupportsScanUnsafeRow with Logging { + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +val mergedMap = offsets.map { + case KafkaSourcePartitionOffset(p, o) => Map(p -> o) +}.reduce(_ ++ _) +KafkaSourceOffset(mergedMap) + } + + private lazy val session = SparkSession.getActiveSession.get + private lazy val sc = session.sparkContext + + private lazy val pollTimeoutMs = sourceOptions.getOrElse( +"kafkaConsumer.pollTimeoutMs", +sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString + ).toLong + + private val maxOffsetsPerTrigger = +sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = { + val offsets = initialOffsets match { +case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets()) +case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets()) +case SpecificOffsetRangeLimit(p) => fetchAndVerify(p) + } + logInfo(s"Initial offsets: $offsets") + offsets.partitionToOffsets + } + + private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = { +val result = kafkaReader.fetchSpecificOffsets(specificOffsets) +specificOffsets.foreach { + case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && +off != KafkaOffsetRangeLimit.EARLIEST => +if (result(tp) != off) { + reportDataLoss( +s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}") +} + case _ => + // no real way to check that beginning or end is reasonable +} +KafkaSourceOffset(result) + } + + // Initialized when creating read tasks. If this diverges from the partitions at the
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159568788 --- Diff: external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io._ +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.commons.io.IOUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.WakeupException + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class ContinuousKafkaReader( +kafkaReader: KafkaOffsetReader, +executorKafkaParams: java.util.Map[String, Object], +sourceOptions: Map[String, String], +metadataPath: String, +initialOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends ContinuousReader with SupportsScanUnsafeRow with Logging { + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +val mergedMap = offsets.map { + case KafkaSourcePartitionOffset(p, o) => Map(p -> o) +}.reduce(_ ++ _) +KafkaSourceOffset(mergedMap) + } + + private lazy val session = SparkSession.getActiveSession.get + private lazy val sc = session.sparkContext + + private lazy val pollTimeoutMs = sourceOptions.getOrElse( +"kafkaConsumer.pollTimeoutMs", +sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString + ).toLong + + private val maxOffsetsPerTrigger = +sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = { + val offsets = initialOffsets match { +case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets()) +case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets()) +case SpecificOffsetRangeLimit(p) => fetchAndVerify(p) + } + logInfo(s"Initial offsets: $offsets") + offsets.partitionToOffsets + } + + private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = { +val result = kafkaReader.fetchSpecificOffsets(specificOffsets) +specificOffsets.foreach { + case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && +off != KafkaOffsetRangeLimit.EARLIEST => +if (result(tp) != off) { + reportDataLoss( +s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}") +} + case _ => + // no real way to check that beginning or end is reasonable +} +KafkaSourceOffset(result) + } + + // Initialized when creating read tasks. If this diverges from the partitions at the
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159568756 --- Diff: external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io._ +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.commons.io.IOUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.WakeupException + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class ContinuousKafkaReader( +kafkaReader: KafkaOffsetReader, +executorKafkaParams: java.util.Map[String, Object], +sourceOptions: Map[String, String], +metadataPath: String, +initialOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends ContinuousReader with SupportsScanUnsafeRow with Logging { + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +val mergedMap = offsets.map { + case KafkaSourcePartitionOffset(p, o) => Map(p -> o) +}.reduce(_ ++ _) +KafkaSourceOffset(mergedMap) + } + + private lazy val session = SparkSession.getActiveSession.get + private lazy val sc = session.sparkContext + + private lazy val pollTimeoutMs = sourceOptions.getOrElse( +"kafkaConsumer.pollTimeoutMs", +sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString + ).toLong + + private val maxOffsetsPerTrigger = +sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = { + val offsets = initialOffsets match { +case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets()) +case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets()) +case SpecificOffsetRangeLimit(p) => fetchAndVerify(p) + } + logInfo(s"Initial offsets: $offsets") + offsets.partitionToOffsets + } + + private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = { +val result = kafkaReader.fetchSpecificOffsets(specificOffsets) +specificOffsets.foreach { + case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && +off != KafkaOffsetRangeLimit.EARLIEST => +if (result(tp) != off) { + reportDataLoss( +s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}") +} + case _ => + // no real way to check that beginning or end is reasonable +} +KafkaSourceOffset(result) + } + + // Initialized when creating read tasks. If this diverges from the partitions at the
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159503021 --- Diff: external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io._ +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.commons.io.IOUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.WakeupException + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class ContinuousKafkaReader( --- End diff -- Add docs explaining params. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159571114 --- Diff: external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io._ +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.commons.io.IOUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.WakeupException + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class ContinuousKafkaReader( --- End diff -- Can we rename this and other classes such that all Kafka class start with "Kafka"? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159566461 --- Diff: external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io._ +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.commons.io.IOUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.WakeupException + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class ContinuousKafkaReader( +kafkaReader: KafkaOffsetReader, +executorKafkaParams: java.util.Map[String, Object], +sourceOptions: Map[String, String], +metadataPath: String, +initialOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends ContinuousReader with SupportsScanUnsafeRow with Logging { + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +val mergedMap = offsets.map { + case KafkaSourcePartitionOffset(p, o) => Map(p -> o) +}.reduce(_ ++ _) +KafkaSourceOffset(mergedMap) + } + + private lazy val session = SparkSession.getActiveSession.get + private lazy val sc = session.sparkContext + + private lazy val pollTimeoutMs = sourceOptions.getOrElse( +"kafkaConsumer.pollTimeoutMs", +sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString + ).toLong + + private val maxOffsetsPerTrigger = +sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = { --- End diff -- is this used anywhere?? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159569751 --- Diff: external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io._ +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.commons.io.IOUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.WakeupException + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class ContinuousKafkaReader( +kafkaReader: KafkaOffsetReader, +executorKafkaParams: java.util.Map[String, Object], +sourceOptions: Map[String, String], +metadataPath: String, +initialOffsets: KafkaOffsetRangeLimit, +failOnDataLoss: Boolean) + extends ContinuousReader with SupportsScanUnsafeRow with Logging { + + override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { +val mergedMap = offsets.map { + case KafkaSourcePartitionOffset(p, o) => Map(p -> o) +}.reduce(_ ++ _) +KafkaSourceOffset(mergedMap) + } + + private lazy val session = SparkSession.getActiveSession.get + private lazy val sc = session.sparkContext + + private lazy val pollTimeoutMs = sourceOptions.getOrElse( +"kafkaConsumer.pollTimeoutMs", +sc.conf.getTimeAsMs("spark.network.timeout", "120s").toString + ).toLong + + private val maxOffsetsPerTrigger = +sourceOptions.get("maxOffsetsPerTrigger").map(_.toLong) + + /** + * Lazily initialize `initialPartitionOffsets` to make sure that `KafkaConsumer.poll` is only + * called in StreamExecutionThread. Otherwise, interrupting a thread while running + * `KafkaConsumer.poll` may hang forever (KAFKA-1894). + */ + private lazy val initialPartitionOffsets = { + val offsets = initialOffsets match { +case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets()) +case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets()) +case SpecificOffsetRangeLimit(p) => fetchAndVerify(p) + } + logInfo(s"Initial offsets: $offsets") + offsets.partitionToOffsets + } + + private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = { +val result = kafkaReader.fetchSpecificOffsets(specificOffsets) +specificOffsets.foreach { + case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && +off != KafkaOffsetRangeLimit.EARLIEST => +if (result(tp) != off) { + reportDataLoss( +s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}") +} + case _ => + // no real way to check that beginning or end is reasonable +} +KafkaSourceOffset(result) + } + + // Initialized when creating read tasks. If this diverges from the partitions at the
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159731324 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala --- @@ -181,26 +223,22 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } } - private def kafkaParamsForProducer(parameters: Map[String, String]): Map[String, String] = { -val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } -if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}")) { - throw new IllegalArgumentException( -s"Kafka option '${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}' is not supported as keys " - + "are serialized with ByteArraySerializer.") -} + override def createContinuousWriter( + queryId: String, + schema: StructType, + mode: OutputMode, + options: DataSourceV2Options): java.util.Optional[ContinuousWriter] = { +import scala.collection.JavaConverters._ -if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}")) -{ - throw new IllegalArgumentException( -s"Kafka option '${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}' is not supported as " - + "value are serialized with ByteArraySerializer.") -} -parameters - .keySet - .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) - .map { k => k.drop(6).toString -> parameters(k) } - .toMap + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName, -ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName) +val spark = SparkSession.getActiveSession.get +val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim) +// We convert the options argument from V2 -> Java map -> scala mutable -> scala immutable. +val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap) + +KafkaWriter.validateQuery( + schema.toAttributes, new java.util.HashMap[String, Object](producerParams.asJava), topic) + +java.util.Optional.of(new ContinuousKafkaWriter(topic, producerParams, schema)) --- End diff -- import java.util.Optional. Its being used in multiple places in this file. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159733170 --- Diff: external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/ContinuousKafkaSuite.scala --- @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.util.Properties + +import org.scalatest.time.SpanSugar._ +import scala.collection.mutable +import scala.util.Random + +import org.apache.spark.SparkContext +import org.apache.spark.sql.{ForeachWriter, Row} +import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation +import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingExecutionRelation, StreamingQueryWrapper} +import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution +import org.apache.spark.sql.streaming.Trigger +import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} + +class ContinuousKafkaSuite extends KafkaSourceTest with SharedSQLContext { --- End diff -- Rename to KafkaContinuousSuite to make it easier to find. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159504846 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala --- @@ -31,10 +32,14 @@ case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) exte override val json = JsonUtils.partitionOffsets(partitionToOffsets) } +private[kafka010] +case class KafkaSourcePartitionOffset(topicPartition: TopicPartition, partitionOffset: Long) + extends PartitionOffset + /** Companion object of the [[KafkaSourceOffset]] */ private[kafka010] object KafkaSourceOffset { - def getPartitionOffsets(offset: Offset): Map[TopicPartition, Long] = { + def getPartitionOffsets(offset: LegacyOffset): Map[TopicPartition, Long] = { --- End diff -- nit: can we use OffsetV1 or something like that to make the difference more obvious --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
Github user tdas commented on a diff in the pull request: https://github.com/apache/spark/pull/20096#discussion_r159563923 --- Diff: external/kafka-0-10-sql/src/main/scala/ContinuousKafkaReader.scala --- @@ -0,0 +1,246 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.sql.kafka010 + +import java.io._ +import java.nio.charset.StandardCharsets +import java.util.concurrent.atomic.AtomicBoolean + +import org.apache.commons.io.IOUtils +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.TopicPartition +import org.apache.kafka.common.errors.WakeupException + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.{DataFrame, Row, SparkSession, SQLContext} +import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection, UnsafeRow} +import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} +import org.apache.spark.sql.catalyst.util.DateTimeUtils +import org.apache.spark.sql.execution.streaming.{HDFSMetadataLog, SerializedOffset} +import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE, VERSION} +import org.apache.spark.sql.sources.v2.reader._ +import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} +import org.apache.spark.sql.types.StructType +import org.apache.spark.unsafe.types.UTF8String + +class ContinuousKafkaReader( +kafkaReader: KafkaOffsetReader, --- End diff -- Please rename this to offsetReader or maybe offsetFetcher to distinguish this from all the Reader classes in DataSourceV2 --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #20096: [SPARK-22908] Add kafka source and sink for conti...
GitHub user jose-torres opened a pull request: https://github.com/apache/spark/pull/20096 [SPARK-22908] Add kafka source and sink for continuous processing. ## What changes were proposed in this pull request? Add kafka source and sink for continuous processing. This involves two small changes to the execution engine: * Bring data reader close() into the normal data reader thread to avoid thread safety issues. * Fix up the semantics of the RECONFIGURING StreamExecution state. State updates are now atomic, and we don't have to deal with swallowing an exception. ## How was this patch tested? new unit tests You can merge this pull request into a Git repository by running: $ git pull https://github.com/jose-torres/spark continuous-kafka Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/20096.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #20096 commit eec38d374a4f3db46a26b8926192ae44da90b6ae Author: Jose TorresDate: 2017-12-21T21:07:35Z basic kafka commit f91cb0190d5a5d7942cd3d53bc571140a03965c6 Author: Jose Torres Date: 2017-12-24T21:40:20Z move reader close to data reader thread in case reader isn't thread safe commit 7c180db439c9d3c6389c5ff6033a61341e7f1bbf Author: Jose Torres Date: 2017-12-27T20:43:16Z test + small fixes commit 7596e34f1e8a047263da7bf8522a14869f289125 Author: Jose Torres Date: 2017-12-27T21:21:56Z fixes lost in cherrypick --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org