Revert "[SPARK-22908] Add kafka source and sink for continuous processing."
This reverts commit 6f7aaed805070d29dcba32e04ca7a1f581fa54b9. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/55dbfbca Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/55dbfbca Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/55dbfbca Branch: refs/heads/master Commit: 55dbfbca37ce4c05f83180777ba3d4fe2d96a02e Parents: 5427739 Author: Sameer Agarwal <samee...@apache.org> Authored: Fri Jan 12 15:00:00 2018 -0800 Committer: Sameer Agarwal <samee...@apache.org> Committed: Fri Jan 12 15:00:00 2018 -0800 ---------------------------------------------------------------------- .../sql/kafka010/KafkaContinuousReader.scala | 232 --------- .../sql/kafka010/KafkaContinuousWriter.scala | 119 ----- .../spark/sql/kafka010/KafkaOffsetReader.scala | 21 +- .../apache/spark/sql/kafka010/KafkaSource.scala | 17 +- .../spark/sql/kafka010/KafkaSourceOffset.scala | 7 +- .../sql/kafka010/KafkaSourceProvider.scala | 105 +--- .../spark/sql/kafka010/KafkaWriteTask.scala | 71 ++- .../apache/spark/sql/kafka010/KafkaWriter.scala | 5 +- .../sql/kafka010/KafkaContinuousSinkSuite.scala | 474 ------------------- .../kafka010/KafkaContinuousSourceSuite.scala | 96 ---- .../sql/kafka010/KafkaContinuousTest.scala | 64 --- .../spark/sql/kafka010/KafkaSourceSuite.scala | 470 +++++++++--------- .../org/apache/spark/sql/DataFrameReader.scala | 32 +- .../org/apache/spark/sql/DataFrameWriter.scala | 25 +- .../datasources/v2/WriteToDataSourceV2.scala | 8 +- .../execution/streaming/StreamExecution.scala | 15 +- .../ContinuousDataSourceRDDIter.scala | 3 +- .../continuous/ContinuousExecution.scala | 67 ++- .../streaming/continuous/EpochCoordinator.scala | 21 +- .../spark/sql/streaming/DataStreamWriter.scala | 26 +- .../apache/spark/sql/streaming/StreamTest.scala | 36 +- 21 files changed, 383 insertions(+), 1531 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/55dbfbca/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala deleted file mode 100644 index 9283795..0000000 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousReader.scala +++ /dev/null @@ -1,232 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.kafka010 - -import java.{util => ju} - -import org.apache.kafka.clients.consumer.ConsumerRecord -import org.apache.kafka.common.TopicPartition - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.SparkSession -import org.apache.spark.sql.catalyst.expressions.UnsafeRow -import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRowWriter} -import org.apache.spark.sql.catalyst.util.DateTimeUtils -import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE} -import org.apache.spark.sql.sources.v2.reader._ -import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset} -import org.apache.spark.sql.types.StructType -import org.apache.spark.unsafe.types.UTF8String - -/** - * A [[ContinuousReader]] for data from kafka. - * - * @param offsetReader a reader used to get kafka offsets. Note that the actual data will be - * read by per-task consumers generated later. - * @param kafkaParams String params for per-task Kafka consumers. - * @param sourceOptions The [[org.apache.spark.sql.sources.v2.DataSourceV2Options]] params which - * are not Kafka consumer params. - * @param metadataPath Path to a directory this reader can use for writing metadata. - * @param initialOffsets The Kafka offsets to start reading data at. - * @param failOnDataLoss Flag indicating whether reading should fail in data loss - * scenarios, where some offsets after the specified initial ones can't be - * properly read. - */ -class KafkaContinuousReader( - offsetReader: KafkaOffsetReader, - kafkaParams: ju.Map[String, Object], - sourceOptions: Map[String, String], - metadataPath: String, - initialOffsets: KafkaOffsetRangeLimit, - failOnDataLoss: Boolean) - extends ContinuousReader with SupportsScanUnsafeRow with Logging { - - private lazy val session = SparkSession.getActiveSession.get - private lazy val sc = session.sparkContext - - // Initialized when creating read tasks. If this diverges from the partitions at the latest - // offsets, we need to reconfigure. - // Exposed outside this object only for unit tests. - private[sql] var knownPartitions: Set[TopicPartition] = _ - - override def readSchema: StructType = KafkaOffsetReader.kafkaSchema - - private var offset: Offset = _ - override def setOffset(start: ju.Optional[Offset]): Unit = { - offset = start.orElse { - val offsets = initialOffsets match { - case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets()) - case LatestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchLatestOffsets()) - case SpecificOffsetRangeLimit(p) => offsetReader.fetchSpecificOffsets(p, reportDataLoss) - } - logInfo(s"Initial offsets: $offsets") - offsets - } - } - - override def getStartOffset(): Offset = offset - - override def deserializeOffset(json: String): Offset = { - KafkaSourceOffset(JsonUtils.partitionOffsets(json)) - } - - override def createUnsafeRowReadTasks(): ju.List[ReadTask[UnsafeRow]] = { - import scala.collection.JavaConverters._ - - val oldStartPartitionOffsets = KafkaSourceOffset.getPartitionOffsets(offset) - - val currentPartitionSet = offsetReader.fetchEarliestOffsets().keySet - val newPartitions = currentPartitionSet.diff(oldStartPartitionOffsets.keySet) - val newPartitionOffsets = offsetReader.fetchEarliestOffsets(newPartitions.toSeq) - - val deletedPartitions = oldStartPartitionOffsets.keySet.diff(currentPartitionSet) - if (deletedPartitions.nonEmpty) { - reportDataLoss(s"Some partitions were deleted: $deletedPartitions") - } - - val startOffsets = newPartitionOffsets ++ - oldStartPartitionOffsets.filterKeys(!deletedPartitions.contains(_)) - knownPartitions = startOffsets.keySet - - startOffsets.toSeq.map { - case (topicPartition, start) => - KafkaContinuousReadTask( - topicPartition, start, kafkaParams, failOnDataLoss) - .asInstanceOf[ReadTask[UnsafeRow]] - }.asJava - } - - /** Stop this source and free any resources it has allocated. */ - def stop(): Unit = synchronized { - offsetReader.close() - } - - override def commit(end: Offset): Unit = {} - - override def mergeOffsets(offsets: Array[PartitionOffset]): Offset = { - val mergedMap = offsets.map { - case KafkaSourcePartitionOffset(p, o) => Map(p -> o) - }.reduce(_ ++ _) - KafkaSourceOffset(mergedMap) - } - - override def needsReconfiguration(): Boolean = { - knownPartitions != null && offsetReader.fetchLatestOffsets().keySet != knownPartitions - } - - override def toString(): String = s"KafkaSource[$offsetReader]" - - /** - * If `failOnDataLoss` is true, this method will throw an `IllegalStateException`. - * Otherwise, just log a warning. - */ - private def reportDataLoss(message: String): Unit = { - if (failOnDataLoss) { - throw new IllegalStateException(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE") - } else { - logWarning(message + s". $INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE") - } - } -} - -/** - * A read task for continuous Kafka processing. This will be serialized and transformed into a - * full reader on executors. - * - * @param topicPartition The (topic, partition) pair this task is responsible for. - * @param startOffset The offset to start reading from within the partition. - * @param kafkaParams Kafka consumer params to use. - * @param failOnDataLoss Flag indicating whether data reader should fail if some offsets - * are skipped. - */ -case class KafkaContinuousReadTask( - topicPartition: TopicPartition, - startOffset: Long, - kafkaParams: ju.Map[String, Object], - failOnDataLoss: Boolean) extends ReadTask[UnsafeRow] { - override def createDataReader(): KafkaContinuousDataReader = { - new KafkaContinuousDataReader(topicPartition, startOffset, kafkaParams, failOnDataLoss) - } -} - -/** - * A per-task data reader for continuous Kafka processing. - * - * @param topicPartition The (topic, partition) pair this data reader is responsible for. - * @param startOffset The offset to start reading from within the partition. - * @param kafkaParams Kafka consumer params to use. - * @param failOnDataLoss Flag indicating whether data reader should fail if some offsets - * are skipped. - */ -class KafkaContinuousDataReader( - topicPartition: TopicPartition, - startOffset: Long, - kafkaParams: ju.Map[String, Object], - failOnDataLoss: Boolean) extends ContinuousDataReader[UnsafeRow] { - private val topic = topicPartition.topic - private val kafkaPartition = topicPartition.partition - private val consumer = CachedKafkaConsumer.createUncached(topic, kafkaPartition, kafkaParams) - - private val sharedRow = new UnsafeRow(7) - private val bufferHolder = new BufferHolder(sharedRow) - private val rowWriter = new UnsafeRowWriter(bufferHolder, 7) - - private var nextKafkaOffset = startOffset - private var currentRecord: ConsumerRecord[Array[Byte], Array[Byte]] = _ - - override def next(): Boolean = { - var r: ConsumerRecord[Array[Byte], Array[Byte]] = null - while (r == null) { - r = consumer.get( - nextKafkaOffset, - untilOffset = Long.MaxValue, - pollTimeoutMs = Long.MaxValue, - failOnDataLoss) - } - nextKafkaOffset = r.offset + 1 - currentRecord = r - true - } - - override def get(): UnsafeRow = { - bufferHolder.reset() - - if (currentRecord.key == null) { - rowWriter.setNullAt(0) - } else { - rowWriter.write(0, currentRecord.key) - } - rowWriter.write(1, currentRecord.value) - rowWriter.write(2, UTF8String.fromString(currentRecord.topic)) - rowWriter.write(3, currentRecord.partition) - rowWriter.write(4, currentRecord.offset) - rowWriter.write(5, - DateTimeUtils.fromJavaTimestamp(new java.sql.Timestamp(currentRecord.timestamp))) - rowWriter.write(6, currentRecord.timestampType.id) - sharedRow.setTotalSize(bufferHolder.totalSize) - sharedRow - } - - override def getOffset(): KafkaSourcePartitionOffset = { - KafkaSourcePartitionOffset(topicPartition, nextKafkaOffset) - } - - override def close(): Unit = { - consumer.close() - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/55dbfbca/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala deleted file mode 100644 index 9843f46..0000000 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaContinuousWriter.scala +++ /dev/null @@ -1,119 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.kafka010 - -import org.apache.kafka.clients.producer.{Callback, ProducerRecord, RecordMetadata} -import scala.collection.JavaConverters._ - -import org.apache.spark.internal.Logging -import org.apache.spark.sql.{Row, SparkSession} -import org.apache.spark.sql.catalyst.InternalRow -import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal, UnsafeProjection} -import org.apache.spark.sql.kafka010.KafkaSourceProvider.{kafkaParamsForProducer, TOPIC_OPTION_KEY} -import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery -import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter -import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.sql.streaming.OutputMode -import org.apache.spark.sql.types.{BinaryType, StringType, StructType} - -/** - * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we - * don't need to really send one. - */ -case object KafkaWriterCommitMessage extends WriterCommitMessage - -/** - * A [[ContinuousWriter]] for Kafka writing. Responsible for generating the writer factory. - * @param topic The topic this writer is responsible for. If None, topic will be inferred from - * a `topic` field in the incoming data. - * @param producerParams Parameters for Kafka producers in each task. - * @param schema The schema of the input data. - */ -class KafkaContinuousWriter( - topic: Option[String], producerParams: Map[String, String], schema: StructType) - extends ContinuousWriter with SupportsWriteInternalRow { - - validateQuery(schema.toAttributes, producerParams.toMap[String, Object].asJava, topic) - - override def createInternalRowWriterFactory(): KafkaContinuousWriterFactory = - KafkaContinuousWriterFactory(topic, producerParams, schema) - - override def commit(epochId: Long, messages: Array[WriterCommitMessage]): Unit = {} - override def abort(messages: Array[WriterCommitMessage]): Unit = {} -} - -/** - * A [[DataWriterFactory]] for Kafka writing. Will be serialized and sent to executors to generate - * the per-task data writers. - * @param topic The topic that should be written to. If None, topic will be inferred from - * a `topic` field in the incoming data. - * @param producerParams Parameters for Kafka producers in each task. - * @param schema The schema of the input data. - */ -case class KafkaContinuousWriterFactory( - topic: Option[String], producerParams: Map[String, String], schema: StructType) - extends DataWriterFactory[InternalRow] { - - override def createDataWriter(partitionId: Int, attemptNumber: Int): DataWriter[InternalRow] = { - new KafkaContinuousDataWriter(topic, producerParams, schema.toAttributes) - } -} - -/** - * A [[DataWriter]] for Kafka writing. One data writer will be created in each partition to - * process incoming rows. - * - * @param targetTopic The topic that this data writer is targeting. If None, topic will be inferred - * from a `topic` field in the incoming data. - * @param producerParams Parameters to use for the Kafka producer. - * @param inputSchema The attributes in the input data. - */ -class KafkaContinuousDataWriter( - targetTopic: Option[String], producerParams: Map[String, String], inputSchema: Seq[Attribute]) - extends KafkaRowWriter(inputSchema, targetTopic) with DataWriter[InternalRow] { - import scala.collection.JavaConverters._ - - private lazy val producer = CachedKafkaProducer.getOrCreate( - new java.util.HashMap[String, Object](producerParams.asJava)) - - def write(row: InternalRow): Unit = { - checkForErrors() - sendRow(row, producer) - } - - def commit(): WriterCommitMessage = { - // Send is asynchronous, but we can't commit until all rows are actually in Kafka. - // This requires flushing and then checking that no callbacks produced errors. - // We also check for errors before to fail as soon as possible - the check is cheap. - checkForErrors() - producer.flush() - checkForErrors() - KafkaWriterCommitMessage - } - - def abort(): Unit = {} - - def close(): Unit = { - checkForErrors() - if (producer != null) { - producer.flush() - checkForErrors() - CachedKafkaProducer.close(new java.util.HashMap[String, Object](producerParams.asJava)) - } - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/55dbfbca/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala index 551641c..3e65949 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetReader.scala @@ -117,14 +117,10 @@ private[kafka010] class KafkaOffsetReader( * Resolves the specific offsets based on Kafka seek positions. * This method resolves offset value -1 to the latest and -2 to the * earliest Kafka seek position. - * - * @param partitionOffsets the specific offsets to resolve - * @param reportDataLoss callback to either report or log data loss depending on setting */ def fetchSpecificOffsets( - partitionOffsets: Map[TopicPartition, Long], - reportDataLoss: String => Unit): KafkaSourceOffset = { - val fetched = runUninterruptibly { + partitionOffsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = + runUninterruptibly { withRetriesWithoutInterrupt { // Poll to get the latest assigned partitions consumer.poll(0) @@ -149,19 +145,6 @@ private[kafka010] class KafkaOffsetReader( } } - partitionOffsets.foreach { - case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && - off != KafkaOffsetRangeLimit.EARLIEST => - if (fetched(tp) != off) { - reportDataLoss( - s"startingOffsets for $tp was $off but consumer reset to ${fetched(tp)}") - } - case _ => - // no real way to check that beginning or end is reasonable - } - KafkaSourceOffset(fetched) - } - /** * Fetch the earliest offsets for the topic partitions that are indicated * in the [[ConsumerStrategy]]. http://git-wip-us.apache.org/repos/asf/spark/blob/55dbfbca/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala index 27da760..e9cff04 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSource.scala @@ -130,7 +130,7 @@ private[kafka010] class KafkaSource( val offsets = startingOffsets match { case EarliestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchEarliestOffsets()) case LatestOffsetRangeLimit => KafkaSourceOffset(kafkaReader.fetchLatestOffsets()) - case SpecificOffsetRangeLimit(p) => kafkaReader.fetchSpecificOffsets(p, reportDataLoss) + case SpecificOffsetRangeLimit(p) => fetchAndVerify(p) } metadataLog.add(0, offsets) logInfo(s"Initial offsets: $offsets") @@ -138,6 +138,21 @@ private[kafka010] class KafkaSource( }.partitionToOffsets } + private def fetchAndVerify(specificOffsets: Map[TopicPartition, Long]) = { + val result = kafkaReader.fetchSpecificOffsets(specificOffsets) + specificOffsets.foreach { + case (tp, off) if off != KafkaOffsetRangeLimit.LATEST && + off != KafkaOffsetRangeLimit.EARLIEST => + if (result(tp) != off) { + reportDataLoss( + s"startingOffsets for $tp was $off but consumer reset to ${result(tp)}") + } + case _ => + // no real way to check that beginning or end is reasonable + } + KafkaSourceOffset(result) + } + private var currentPartitionOffsets: Option[Map[TopicPartition, Long]] = None override def schema: StructType = KafkaOffsetReader.kafkaSchema http://git-wip-us.apache.org/repos/asf/spark/blob/55dbfbca/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala index c82154c..b5da415 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceOffset.scala @@ -20,22 +20,17 @@ package org.apache.spark.sql.kafka010 import org.apache.kafka.common.TopicPartition import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset} -import org.apache.spark.sql.sources.v2.streaming.reader.{Offset => OffsetV2, PartitionOffset} /** * An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and * their offsets. */ private[kafka010] -case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends OffsetV2 { +case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends Offset { override val json = JsonUtils.partitionOffsets(partitionToOffsets) } -private[kafka010] -case class KafkaSourcePartitionOffset(topicPartition: TopicPartition, partitionOffset: Long) - extends PartitionOffset - /** Companion object of the [[KafkaSourceOffset]] */ private[kafka010] object KafkaSourceOffset { http://git-wip-us.apache.org/repos/asf/spark/blob/55dbfbca/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala index 3914370..3cb4d8c 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaSourceProvider.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.kafka010 import java.{util => ju} -import java.util.{Locale, Optional, UUID} +import java.util.{Locale, UUID} import scala.collection.JavaConverters._ @@ -27,12 +27,9 @@ import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.common.serialization.{ByteArrayDeserializer, ByteArraySerializer} import org.apache.spark.internal.Logging -import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSession, SQLContext} -import org.apache.spark.sql.execution.streaming.{Offset, Sink, Source} +import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SQLContext} +import org.apache.spark.sql.execution.streaming.{Sink, Source} import org.apache.spark.sql.sources._ -import org.apache.spark.sql.sources.v2.{DataSourceV2, DataSourceV2Options} -import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, ContinuousWriteSupport} -import org.apache.spark.sql.sources.v2.streaming.writer.ContinuousWriter import org.apache.spark.sql.streaming.OutputMode import org.apache.spark.sql.types.StructType @@ -46,8 +43,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister with StreamSinkProvider with RelationProvider with CreatableRelationProvider - with ContinuousWriteSupport - with ContinuousReadSupport with Logging { import KafkaSourceProvider._ @@ -106,43 +101,6 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister failOnDataLoss(caseInsensitiveParams)) } - override def createContinuousReader( - schema: Optional[StructType], - metadataPath: String, - options: DataSourceV2Options): KafkaContinuousReader = { - val parameters = options.asMap().asScala.toMap - validateStreamOptions(parameters) - // Each running query should use its own group id. Otherwise, the query may be only assigned - // partial data since Kafka will assign partitions to multiple consumers having the same group - // id. Hence, we should generate a unique id for each query. - val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}" - - val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } - val specifiedKafkaParams = - parameters - .keySet - .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) - .map { k => k.drop(6).toString -> parameters(k) } - .toMap - - val startingStreamOffsets = KafkaSourceProvider.getKafkaOffsetRangeLimit(caseInsensitiveParams, - STARTING_OFFSETS_OPTION_KEY, LatestOffsetRangeLimit) - - val kafkaOffsetReader = new KafkaOffsetReader( - strategy(caseInsensitiveParams), - kafkaParamsForDriver(specifiedKafkaParams), - parameters, - driverGroupIdPrefix = s"$uniqueGroupId-driver") - - new KafkaContinuousReader( - kafkaOffsetReader, - kafkaParamsForExecutors(specifiedKafkaParams, uniqueGroupId), - parameters, - metadataPath, - startingStreamOffsets, - failOnDataLoss(caseInsensitiveParams)) - } - /** * Returns a new base relation with the given parameters. * @@ -223,22 +181,26 @@ private[kafka010] class KafkaSourceProvider extends DataSourceRegister } } - override def createContinuousWriter( - queryId: String, - schema: StructType, - mode: OutputMode, - options: DataSourceV2Options): Optional[ContinuousWriter] = { - import scala.collection.JavaConverters._ - - val spark = SparkSession.getActiveSession.get - val topic = Option(options.get(TOPIC_OPTION_KEY).orElse(null)).map(_.trim) - // We convert the options argument from V2 -> Java map -> scala mutable -> scala immutable. - val producerParams = kafkaParamsForProducer(options.asMap.asScala.toMap) - - KafkaWriter.validateQuery( - schema.toAttributes, new java.util.HashMap[String, Object](producerParams.asJava), topic) + private def kafkaParamsForProducer(parameters: Map[String, String]): Map[String, String] = { + val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } + if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}")) { + throw new IllegalArgumentException( + s"Kafka option '${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}' is not supported as keys " + + "are serialized with ByteArraySerializer.") + } - Optional.of(new KafkaContinuousWriter(topic, producerParams, schema)) + if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}")) + { + throw new IllegalArgumentException( + s"Kafka option '${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}' is not supported as " + + "value are serialized with ByteArraySerializer.") + } + parameters + .keySet + .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) + .map { k => k.drop(6).toString -> parameters(k) } + .toMap + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName) } private def strategy(caseInsensitiveParams: Map[String, String]) = @@ -488,27 +450,4 @@ private[kafka010] object KafkaSourceProvider extends Logging { def build(): ju.Map[String, Object] = map } - - private[kafka010] def kafkaParamsForProducer( - parameters: Map[String, String]): Map[String, String] = { - val caseInsensitiveParams = parameters.map { case (k, v) => (k.toLowerCase(Locale.ROOT), v) } - if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}")) { - throw new IllegalArgumentException( - s"Kafka option '${ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG}' is not supported as keys " - + "are serialized with ByteArraySerializer.") - } - - if (caseInsensitiveParams.contains(s"kafka.${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}")) - { - throw new IllegalArgumentException( - s"Kafka option '${ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG}' is not supported as " - + "value are serialized with ByteArraySerializer.") - } - parameters - .keySet - .filter(_.toLowerCase(Locale.ROOT).startsWith("kafka.")) - .map { k => k.drop(6).toString -> parameters(k) } - .toMap + (ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName, - ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[ByteArraySerializer].getName) - } } http://git-wip-us.apache.org/repos/asf/spark/blob/55dbfbca/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala index baa60fe..6fd333e 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala @@ -33,8 +33,10 @@ import org.apache.spark.sql.types.{BinaryType, StringType} private[kafka010] class KafkaWriteTask( producerConfiguration: ju.Map[String, Object], inputSchema: Seq[Attribute], - topic: Option[String]) extends KafkaRowWriter(inputSchema, topic) { + topic: Option[String]) { // used to synchronize with Kafka callbacks + @volatile private var failedWrite: Exception = null + private val projection = createProjection private var producer: KafkaProducer[Array[Byte], Array[Byte]] = _ /** @@ -44,7 +46,23 @@ private[kafka010] class KafkaWriteTask( producer = CachedKafkaProducer.getOrCreate(producerConfiguration) while (iterator.hasNext && failedWrite == null) { val currentRow = iterator.next() - sendRow(currentRow, producer) + val projectedRow = projection(currentRow) + val topic = projectedRow.getUTF8String(0) + val key = projectedRow.getBinary(1) + val value = projectedRow.getBinary(2) + if (topic == null) { + throw new NullPointerException(s"null topic present in the data. Use the " + + s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.") + } + val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value) + val callback = new Callback() { + override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = { + if (failedWrite == null && e != null) { + failedWrite = e + } + } + } + producer.send(record, callback) } } @@ -56,49 +74,8 @@ private[kafka010] class KafkaWriteTask( producer = null } } -} - -private[kafka010] abstract class KafkaRowWriter( - inputSchema: Seq[Attribute], topic: Option[String]) { - - // used to synchronize with Kafka callbacks - @volatile protected var failedWrite: Exception = _ - protected val projection = createProjection - - private val callback = new Callback() { - override def onCompletion(recordMetadata: RecordMetadata, e: Exception): Unit = { - if (failedWrite == null && e != null) { - failedWrite = e - } - } - } - /** - * Send the specified row to the producer, with a callback that will save any exception - * to failedWrite. Note that send is asynchronous; subclasses must flush() their producer before - * assuming the row is in Kafka. - */ - protected def sendRow( - row: InternalRow, producer: KafkaProducer[Array[Byte], Array[Byte]]): Unit = { - val projectedRow = projection(row) - val topic = projectedRow.getUTF8String(0) - val key = projectedRow.getBinary(1) - val value = projectedRow.getBinary(2) - if (topic == null) { - throw new NullPointerException(s"null topic present in the data. Use the " + - s"${KafkaSourceProvider.TOPIC_OPTION_KEY} option for setting a default topic.") - } - val record = new ProducerRecord[Array[Byte], Array[Byte]](topic.toString, key, value) - producer.send(record, callback) - } - - protected def checkForErrors(): Unit = { - if (failedWrite != null) { - throw failedWrite - } - } - - private def createProjection = { + private def createProjection: UnsafeProjection = { val topicExpression = topic.map(Literal(_)).orElse { inputSchema.find(_.name == KafkaWriter.TOPIC_ATTRIBUTE_NAME) }.getOrElse { @@ -135,5 +112,11 @@ private[kafka010] abstract class KafkaRowWriter( Seq(topicExpression, Cast(keyExpression, BinaryType), Cast(valueExpression, BinaryType)), inputSchema) } + + private def checkForErrors(): Unit = { + if (failedWrite != null) { + throw failedWrite + } + } } http://git-wip-us.apache.org/repos/asf/spark/blob/55dbfbca/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala index 15cd448..5e9ae35 100644 --- a/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala +++ b/external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriter.scala @@ -43,9 +43,10 @@ private[kafka010] object KafkaWriter extends Logging { override def toString: String = "KafkaWriter" def validateQuery( - schema: Seq[Attribute], + queryExecution: QueryExecution, kafkaParameters: ju.Map[String, Object], topic: Option[String] = None): Unit = { + val schema = queryExecution.analyzed.output schema.find(_.name == TOPIC_ATTRIBUTE_NAME).getOrElse( if (topic.isEmpty) { throw new AnalysisException(s"topic option required when no " + @@ -83,7 +84,7 @@ private[kafka010] object KafkaWriter extends Logging { kafkaParameters: ju.Map[String, Object], topic: Option[String] = None): Unit = { val schema = queryExecution.analyzed.output - validateQuery(schema, kafkaParameters, topic) + validateQuery(queryExecution, kafkaParameters, topic) queryExecution.toRdd.foreachPartition { iter => val writeTask = new KafkaWriteTask(kafkaParameters, schema, topic) Utils.tryWithSafeFinally(block = writeTask.execute(iter))( http://git-wip-us.apache.org/repos/asf/spark/blob/55dbfbca/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala deleted file mode 100644 index dfc97b1..0000000 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSinkSuite.scala +++ /dev/null @@ -1,474 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.kafka010 - -import java.util.Locale -import java.util.concurrent.atomic.AtomicInteger - -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.common.serialization.ByteArraySerializer -import org.scalatest.time.SpanSugar._ -import scala.collection.JavaConverters._ - -import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode} -import org.apache.spark.sql.catalyst.expressions.{AttributeReference, SpecificInternalRow, UnsafeProjection} -import org.apache.spark.sql.execution.streaming.MemoryStream -import org.apache.spark.sql.streaming._ -import org.apache.spark.sql.types.{BinaryType, DataType} -import org.apache.spark.util.Utils - -/** - * This is a temporary port of KafkaSinkSuite, since we do not yet have a V2 memory stream. - * Once we have one, this will be changed to a specialization of KafkaSinkSuite and we won't have - * to duplicate all the code. - */ -class KafkaContinuousSinkSuite extends KafkaContinuousTest { - import testImplicits._ - - override val streamingTimeout = 30.seconds - - override def beforeAll(): Unit = { - super.beforeAll() - testUtils = new KafkaTestUtils( - withBrokerProps = Map("auto.create.topics.enable" -> "false")) - testUtils.setup() - } - - override def afterAll(): Unit = { - if (testUtils != null) { - testUtils.teardown() - testUtils = null - } - super.afterAll() - } - - test("streaming - write to kafka with topic field") { - val inputTopic = newTopic() - testUtils.createTopic(inputTopic, partitions = 1) - - val input = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("subscribe", inputTopic) - .option("startingOffsets", "earliest") - .load() - - val topic = newTopic() - testUtils.createTopic(topic) - - val writer = createKafkaWriter( - input.toDF(), - withTopic = None, - withOutputMode = Some(OutputMode.Append))( - withSelectExpr = s"'$topic' as topic", "value") - - val reader = createKafkaReader(topic) - .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value") - .selectExpr("CAST(key as INT) key", "CAST(value as INT) value") - .as[(Int, Int)] - .map(_._2) - - try { - testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) - failAfter(streamingTimeout) { - writer.processAllAvailable() - } - checkDatasetUnorderly(reader, 1, 2, 3, 4, 5) - testUtils.sendMessages(inputTopic, Array("6", "7", "8", "9", "10")) - failAfter(streamingTimeout) { - writer.processAllAvailable() - } - checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) - } finally { - writer.stop() - } - } - - test("streaming - write w/o topic field, with topic option") { - val inputTopic = newTopic() - testUtils.createTopic(inputTopic, partitions = 1) - - val input = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("subscribe", inputTopic) - .option("startingOffsets", "earliest") - .load() - - val topic = newTopic() - testUtils.createTopic(topic) - - val writer = createKafkaWriter( - input.toDF(), - withTopic = Some(topic), - withOutputMode = Some(OutputMode.Append()))() - - val reader = createKafkaReader(topic) - .selectExpr("CAST(key as STRING) key", "CAST(value as STRING) value") - .selectExpr("CAST(key as INT) key", "CAST(value as INT) value") - .as[(Int, Int)] - .map(_._2) - - try { - testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) - failAfter(streamingTimeout) { - writer.processAllAvailable() - } - checkDatasetUnorderly(reader, 1, 2, 3, 4, 5) - testUtils.sendMessages(inputTopic, Array("6", "7", "8", "9", "10")) - failAfter(streamingTimeout) { - writer.processAllAvailable() - } - checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) - } finally { - writer.stop() - } - } - - test("streaming - topic field and topic option") { - /* The purpose of this test is to ensure that the topic option - * overrides the topic field. We begin by writing some data that - * includes a topic field and value (e.g., 'foo') along with a topic - * option. Then when we read from the topic specified in the option - * we should see the data i.e., the data was written to the topic - * option, and not to the topic in the data e.g., foo - */ - val inputTopic = newTopic() - testUtils.createTopic(inputTopic, partitions = 1) - - val input = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("subscribe", inputTopic) - .option("startingOffsets", "earliest") - .load() - - val topic = newTopic() - testUtils.createTopic(topic) - - val writer = createKafkaWriter( - input.toDF(), - withTopic = Some(topic), - withOutputMode = Some(OutputMode.Append()))( - withSelectExpr = "'foo' as topic", "CAST(value as STRING) value") - - val reader = createKafkaReader(topic) - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .selectExpr("CAST(key AS INT)", "CAST(value AS INT)") - .as[(Int, Int)] - .map(_._2) - - try { - testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) - failAfter(streamingTimeout) { - writer.processAllAvailable() - } - checkDatasetUnorderly(reader, 1, 2, 3, 4, 5) - testUtils.sendMessages(inputTopic, Array("6", "7", "8", "9", "10")) - failAfter(streamingTimeout) { - writer.processAllAvailable() - } - checkDatasetUnorderly(reader, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10) - } finally { - writer.stop() - } - } - - test("null topic attribute") { - val inputTopic = newTopic() - testUtils.createTopic(inputTopic, partitions = 1) - - val input = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("subscribe", inputTopic) - .option("startingOffsets", "earliest") - .load() - val topic = newTopic() - testUtils.createTopic(topic) - - /* No topic field or topic option */ - var writer: StreamingQuery = null - var ex: Exception = null - try { - ex = intercept[StreamingQueryException] { - writer = createKafkaWriter(input.toDF())( - withSelectExpr = "CAST(null as STRING) as topic", "value" - ) - testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) - writer.processAllAvailable() - } - } finally { - writer.stop() - } - assert(ex.getCause.getCause.getMessage - .toLowerCase(Locale.ROOT) - .contains("null topic present in the data.")) - } - - test("streaming - write data with bad schema") { - val inputTopic = newTopic() - testUtils.createTopic(inputTopic, partitions = 1) - - val input = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("subscribe", inputTopic) - .option("startingOffsets", "earliest") - .load() - val topic = newTopic() - testUtils.createTopic(topic) - - /* No topic field or topic option */ - var writer: StreamingQuery = null - var ex: Exception = null - try { - ex = intercept[StreamingQueryException] { - writer = createKafkaWriter(input.toDF())( - withSelectExpr = "value as key", "value" - ) - testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) - writer.processAllAvailable() - } - } finally { - writer.stop() - } - assert(ex.getMessage - .toLowerCase(Locale.ROOT) - .contains("topic option required when no 'topic' attribute is present")) - - try { - /* No value field */ - ex = intercept[StreamingQueryException] { - writer = createKafkaWriter(input.toDF())( - withSelectExpr = s"'$topic' as topic", "value as key" - ) - testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) - writer.processAllAvailable() - } - } finally { - writer.stop() - } - assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( - "required attribute 'value' not found")) - } - - test("streaming - write data with valid schema but wrong types") { - val inputTopic = newTopic() - testUtils.createTopic(inputTopic, partitions = 1) - - val input = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("subscribe", inputTopic) - .option("startingOffsets", "earliest") - .load() - .selectExpr("CAST(value as STRING) value") - val topic = newTopic() - testUtils.createTopic(topic) - - var writer: StreamingQuery = null - var ex: Exception = null - try { - /* topic field wrong type */ - ex = intercept[StreamingQueryException] { - writer = createKafkaWriter(input.toDF())( - withSelectExpr = s"CAST('1' as INT) as topic", "value" - ) - testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) - writer.processAllAvailable() - } - } finally { - writer.stop() - } - assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("topic type must be a string")) - - try { - /* value field wrong type */ - ex = intercept[StreamingQueryException] { - writer = createKafkaWriter(input.toDF())( - withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as value" - ) - testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) - writer.processAllAvailable() - } - } finally { - writer.stop() - } - assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( - "value attribute type must be a string or binarytype")) - - try { - ex = intercept[StreamingQueryException] { - /* key field wrong type */ - writer = createKafkaWriter(input.toDF())( - withSelectExpr = s"'$topic' as topic", "CAST(value as INT) as key", "value" - ) - testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) - writer.processAllAvailable() - } - } finally { - writer.stop() - } - assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( - "key attribute type must be a string or binarytype")) - } - - test("streaming - write to non-existing topic") { - val inputTopic = newTopic() - testUtils.createTopic(inputTopic, partitions = 1) - - val input = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("subscribe", inputTopic) - .option("startingOffsets", "earliest") - .load() - val topic = newTopic() - - var writer: StreamingQuery = null - var ex: Exception = null - try { - ex = intercept[StreamingQueryException] { - writer = createKafkaWriter(input.toDF(), withTopic = Some(topic))() - testUtils.sendMessages(inputTopic, Array("1", "2", "3", "4", "5")) - eventually(timeout(streamingTimeout)) { - assert(writer.exception.isDefined) - } - throw writer.exception.get - } - } finally { - writer.stop() - } - assert(ex.getMessage.toLowerCase(Locale.ROOT).contains("job aborted")) - } - - test("streaming - exception on config serializer") { - val inputTopic = newTopic() - testUtils.createTopic(inputTopic, partitions = 1) - testUtils.sendMessages(inputTopic, Array("0")) - - val input = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("subscribe", inputTopic) - .load() - var writer: StreamingQuery = null - var ex: Exception = null - try { - ex = intercept[StreamingQueryException] { - writer = createKafkaWriter( - input.toDF(), - withOptions = Map("kafka.key.serializer" -> "foo"))() - writer.processAllAvailable() - } - assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( - "kafka option 'key.serializer' is not supported")) - } finally { - writer.stop() - } - - try { - ex = intercept[StreamingQueryException] { - writer = createKafkaWriter( - input.toDF(), - withOptions = Map("kafka.value.serializer" -> "foo"))() - writer.processAllAvailable() - } - assert(ex.getMessage.toLowerCase(Locale.ROOT).contains( - "kafka option 'value.serializer' is not supported")) - } finally { - writer.stop() - } - } - - test("generic - write big data with small producer buffer") { - /* This test ensures that we understand the semantics of Kafka when - * is comes to blocking on a call to send when the send buffer is full. - * This test will configure the smallest possible producer buffer and - * indicate that we should block when it is full. Thus, no exception should - * be thrown in the case of a full buffer. - */ - val topic = newTopic() - testUtils.createTopic(topic, 1) - val options = new java.util.HashMap[String, String] - options.put("bootstrap.servers", testUtils.brokerAddress) - options.put("buffer.memory", "16384") // min buffer size - options.put("block.on.buffer.full", "true") - options.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName) - options.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, classOf[ByteArraySerializer].getName) - val inputSchema = Seq(AttributeReference("value", BinaryType)()) - val data = new Array[Byte](15000) // large value - val writeTask = new KafkaContinuousDataWriter(Some(topic), options.asScala.toMap, inputSchema) - try { - val fieldTypes: Array[DataType] = Array(BinaryType) - val converter = UnsafeProjection.create(fieldTypes) - val row = new SpecificInternalRow(fieldTypes) - row.update(0, data) - val iter = Seq.fill(1000)(converter.apply(row)).iterator - iter.foreach(writeTask.write(_)) - writeTask.commit() - } finally { - writeTask.close() - } - } - - private def createKafkaReader(topic: String): DataFrame = { - spark.read - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("startingOffsets", "earliest") - .option("endingOffsets", "latest") - .option("subscribe", topic) - .load() - } - - private def createKafkaWriter( - input: DataFrame, - withTopic: Option[String] = None, - withOutputMode: Option[OutputMode] = None, - withOptions: Map[String, String] = Map[String, String]()) - (withSelectExpr: String*): StreamingQuery = { - var stream: DataStreamWriter[Row] = null - val checkpointDir = Utils.createTempDir() - var df = input.toDF() - if (withSelectExpr.length > 0) { - df = df.selectExpr(withSelectExpr: _*) - } - stream = df.writeStream - .format("kafka") - .option("checkpointLocation", checkpointDir.getCanonicalPath) - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - // We need to reduce blocking time to efficiently test non-existent partition behavior. - .option("kafka.max.block.ms", "1000") - .trigger(Trigger.Continuous(1000)) - .queryName("kafkaStream") - withTopic.foreach(stream.option("topic", _)) - withOutputMode.foreach(stream.outputMode(_)) - withOptions.foreach(opt => stream.option(opt._1, opt._2)) - stream.start() - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/55dbfbca/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala deleted file mode 100644 index b3dade4..0000000 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousSourceSuite.scala +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.kafka010 - -import java.util.Properties -import java.util.concurrent.atomic.AtomicInteger - -import org.scalatest.time.SpanSugar._ -import scala.collection.mutable -import scala.util.Random - -import org.apache.spark.SparkContext -import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row} -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.execution.streaming.StreamExecution -import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution -import org.apache.spark.sql.streaming.{StreamTest, Trigger} -import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} - -// Run tests in KafkaSourceSuiteBase in continuous execution mode. -class KafkaContinuousSourceSuite extends KafkaSourceSuiteBase with KafkaContinuousTest - -class KafkaContinuousSourceTopicDeletionSuite extends KafkaContinuousTest { - import testImplicits._ - - override val brokerProps = Map("auto.create.topics.enable" -> "false") - - test("subscribing topic by pattern with topic deletions") { - val topicPrefix = newTopic() - val topic = topicPrefix + "-seems" - val topic2 = topicPrefix + "-bad" - testUtils.createTopic(topic, partitions = 5) - testUtils.sendMessages(topic, Array("-1")) - require(testUtils.getLatestOffsets(Set(topic)).size === 5) - - val reader = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("subscribePattern", s"$topicPrefix-.*") - .option("failOnDataLoss", "false") - - val kafka = reader.load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - val mapped = kafka.map(kv => kv._2.toInt + 1) - - testStream(mapped)( - makeSureGetOffsetCalled, - AddKafkaData(Set(topic), 1, 2, 3), - CheckAnswer(2, 3, 4), - Execute { query => - testUtils.deleteTopic(topic) - testUtils.createTopic(topic2, partitions = 5) - eventually(timeout(streamingTimeout)) { - assert( - query.lastExecution.logical.collectFirst { - case DataSourceV2Relation(_, r: KafkaContinuousReader) => r - }.exists { r => - // Ensure the new topic is present and the old topic is gone. - r.knownPartitions.exists(_.topic == topic2) - }, - s"query never reconfigured to new topic $topic2") - } - }, - AddKafkaData(Set(topic2), 4, 5, 6), - CheckAnswer(2, 3, 4, 5, 6, 7) - ) - } -} - -class KafkaContinuousSourceStressForDontFailOnDataLossSuite - extends KafkaSourceStressForDontFailOnDataLossSuite { - override protected def startStream(ds: Dataset[Int]) = { - ds.writeStream - .format("memory") - .queryName("memory") - .start() - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/55dbfbca/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala deleted file mode 100644 index e713e66..0000000 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaContinuousTest.scala +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.sql.kafka010 - -import org.apache.spark.SparkContext -import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Relation -import org.apache.spark.sql.execution.streaming.StreamExecution -import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution -import org.apache.spark.sql.streaming.Trigger -import org.apache.spark.sql.test.TestSparkSession - -// Trait to configure StreamTest for kafka continuous execution tests. -trait KafkaContinuousTest extends KafkaSourceTest { - override val defaultTrigger = Trigger.Continuous(1000) - override val defaultUseV2Sink = true - - // We need more than the default local[2] to be able to schedule all partitions simultaneously. - override protected def createSparkSession = new TestSparkSession( - new SparkContext( - "local[10]", - "continuous-stream-test-sql-context", - sparkConf.set("spark.sql.testkey", "true"))) - - // In addition to setting the partitions in Kafka, we have to wait until the query has - // reconfigured to the new count so the test framework can hook in properly. - override protected def setTopicPartitions( - topic: String, newCount: Int, query: StreamExecution) = { - testUtils.addPartitions(topic, newCount) - eventually(timeout(streamingTimeout)) { - assert( - query.lastExecution.logical.collectFirst { - case DataSourceV2Relation(_, r: KafkaContinuousReader) => r - }.exists(_.knownPartitions.size == newCount), - s"query never reconfigured to $newCount partitions") - } - } - - test("ensure continuous stream is being used") { - val query = spark.readStream - .format("rate") - .option("numPartitions", "1") - .option("rowsPerSecond", "1") - .load() - - testStream(query)( - Execute(q => assert(q.isInstanceOf[ContinuousExecution])) - ) - } -} http://git-wip-us.apache.org/repos/asf/spark/blob/55dbfbca/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala ---------------------------------------------------------------------- diff --git a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala index d66908f8..2034b9b 100644 --- a/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala +++ b/external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaSourceSuite.scala @@ -34,14 +34,11 @@ import org.scalatest.concurrent.PatienceConfiguration.Timeout import org.scalatest.time.SpanSugar._ import org.apache.spark.SparkContext -import org.apache.spark.sql.{DataFrame, Dataset, ForeachWriter, Row} -import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, WriteToDataSourceV2Exec} +import org.apache.spark.sql.ForeachWriter import org.apache.spark.sql.execution.streaming._ -import org.apache.spark.sql.execution.streaming.continuous.ContinuousExecution -import org.apache.spark.sql.execution.streaming.sources.ContinuousMemoryWriter import org.apache.spark.sql.functions.{count, window} import org.apache.spark.sql.kafka010.KafkaSourceProvider._ -import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest, Trigger} +import org.apache.spark.sql.streaming.{ProcessingTime, StreamTest} import org.apache.spark.sql.streaming.util.StreamManualClock import org.apache.spark.sql.test.{SharedSQLContext, TestSparkSession} import org.apache.spark.util.Utils @@ -52,11 +49,9 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { override val streamingTimeout = 30.seconds - protected val brokerProps = Map[String, Object]() - override def beforeAll(): Unit = { super.beforeAll() - testUtils = new KafkaTestUtils(brokerProps) + testUtils = new KafkaTestUtils testUtils.setup() } @@ -64,25 +59,18 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { if (testUtils != null) { testUtils.teardown() testUtils = null + super.afterAll() } - super.afterAll() } protected def makeSureGetOffsetCalled = AssertOnQuery { q => // Because KafkaSource's initialPartitionOffsets is set lazily, we need to make sure - // its "getOffset" is called before pushing any data. Otherwise, because of the race condition, + // its "getOffset" is called before pushing any data. Otherwise, because of the race contion, // we don't know which data should be fetched when `startingOffsets` is latest. - q match { - case c: ContinuousExecution => c.awaitEpoch(0) - case m: MicroBatchExecution => m.processAllAvailable() - } + q.processAllAvailable() true } - protected def setTopicPartitions(topic: String, newCount: Int, query: StreamExecution) : Unit = { - testUtils.addPartitions(topic, newCount) - } - /** * Add data to Kafka. * @@ -94,7 +82,7 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { message: String = "", topicAction: (String, Option[Int]) => Unit = (_, _) => {}) extends AddData { - override def addData(query: Option[StreamExecution]): (BaseStreamingSource, Offset) = { + override def addData(query: Option[StreamExecution]): (Source, Offset) = { if (query.get.isActive) { // Make sure no Spark job is running when deleting a topic query.get.processAllAvailable() @@ -109,18 +97,16 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { topicAction(existingTopicPartitions._1, Some(existingTopicPartitions._2)) } + // Read all topics again in case some topics are delete. + val allTopics = testUtils.getAllTopicsAndPartitionSize().toMap.keys require( query.nonEmpty, "Cannot add data when there is no query for finding the active kafka source") val sources = query.get.logicalPlan.collect { - case StreamingExecutionRelation(source: KafkaSource, _) => source - } ++ (query.get.lastExecution match { - case null => Seq() - case e => e.logical.collect { - case DataSourceV2Relation(_, reader: KafkaContinuousReader) => reader - } - }) + case StreamingExecutionRelation(source, _) if source.isInstanceOf[KafkaSource] => + source.asInstanceOf[KafkaSource] + } if (sources.isEmpty) { throw new Exception( "Could not find Kafka source in the StreamExecution logical plan to add data to") @@ -151,158 +137,14 @@ abstract class KafkaSourceTest extends StreamTest with SharedSQLContext { override def toString: String = s"AddKafkaData(topics = $topics, data = $data, message = $message)" } - - private val topicId = new AtomicInteger(0) - protected def newTopic(): String = s"topic-${topicId.getAndIncrement()}" } -class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase { - - import testImplicits._ - - test("(de)serialization of initial offsets") { - val topic = newTopic() - testUtils.createTopic(topic, partitions = 5) - - val reader = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("subscribe", topic) - - testStream(reader.load)( - makeSureGetOffsetCalled, - StopStream, - StartStream(), - StopStream) - } - - test("maxOffsetsPerTrigger") { - val topic = newTopic() - testUtils.createTopic(topic, partitions = 3) - testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, Some(0)) - testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, Some(1)) - testUtils.sendMessages(topic, Array("1"), Some(2)) - - val reader = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("maxOffsetsPerTrigger", 10) - .option("subscribe", topic) - .option("startingOffsets", "earliest") - val kafka = reader.load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) - - val clock = new StreamManualClock - - val waitUntilBatchProcessed = AssertOnQuery { q => - eventually(Timeout(streamingTimeout)) { - if (!q.exception.isDefined) { - assert(clock.isStreamWaitingAt(clock.getTimeMillis())) - } - } - if (q.exception.isDefined) { - throw q.exception.get - } - true - } - - testStream(mapped)( - StartStream(ProcessingTime(100), clock), - waitUntilBatchProcessed, - // 1 from smallest, 1 from middle, 8 from biggest - CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107), - AdvanceManualClock(100), - waitUntilBatchProcessed, - // smallest now empty, 1 more from middle, 9 more from biggest - CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, - 11, 108, 109, 110, 111, 112, 113, 114, 115, 116 - ), - StopStream, - StartStream(ProcessingTime(100), clock), - waitUntilBatchProcessed, - // smallest now empty, 1 more from middle, 9 more from biggest - CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, - 11, 108, 109, 110, 111, 112, 113, 114, 115, 116, - 12, 117, 118, 119, 120, 121, 122, 123, 124, 125 - ), - AdvanceManualClock(100), - waitUntilBatchProcessed, - // smallest now empty, 1 more from middle, 9 more from biggest - CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, - 11, 108, 109, 110, 111, 112, 113, 114, 115, 116, - 12, 117, 118, 119, 120, 121, 122, 123, 124, 125, - 13, 126, 127, 128, 129, 130, 131, 132, 133, 134 - ) - ) - } - - test("input row metrics") { - val topic = newTopic() - testUtils.createTopic(topic, partitions = 5) - testUtils.sendMessages(topic, Array("-1")) - require(testUtils.getLatestOffsets(Set(topic)).size === 5) - - val kafka = spark - .readStream - .format("kafka") - .option("subscribe", topic) - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - - val mapped = kafka.map(kv => kv._2.toInt + 1) - testStream(mapped)( - StartStream(trigger = ProcessingTime(1)), - makeSureGetOffsetCalled, - AddKafkaData(Set(topic), 1, 2, 3), - CheckAnswer(2, 3, 4), - AssertOnQuery { query => - val recordsRead = query.recentProgress.map(_.numInputRows).sum - recordsRead == 3 - } - ) - } - - test("subscribing topic by pattern with topic deletions") { - val topicPrefix = newTopic() - val topic = topicPrefix + "-seems" - val topic2 = topicPrefix + "-bad" - testUtils.createTopic(topic, partitions = 5) - testUtils.sendMessages(topic, Array("-1")) - require(testUtils.getLatestOffsets(Set(topic)).size === 5) - val reader = spark - .readStream - .format("kafka") - .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("subscribePattern", s"$topicPrefix-.*") - .option("failOnDataLoss", "false") +class KafkaSourceSuite extends KafkaSourceTest { - val kafka = reader.load() - .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") - .as[(String, String)] - val mapped = kafka.map(kv => kv._2.toInt + 1) + import testImplicits._ - testStream(mapped)( - makeSureGetOffsetCalled, - AddKafkaData(Set(topic), 1, 2, 3), - CheckAnswer(2, 3, 4), - Assert { - testUtils.deleteTopic(topic) - testUtils.createTopic(topic2, partitions = 5) - true - }, - AddKafkaData(Set(topic2), 4, 5, 6), - CheckAnswer(2, 3, 4, 5, 6, 7) - ) - } + private val topicId = new AtomicInteger(0) testWithUninterruptibleThread( "deserialization of initial offset with Spark 2.1.0") { @@ -395,51 +237,86 @@ class KafkaMicroBatchSourceSuite extends KafkaSourceSuiteBase { } } - test("KafkaSource with watermark") { - val now = System.currentTimeMillis() + test("(de)serialization of initial offsets") { val topic = newTopic() - testUtils.createTopic(newTopic(), partitions = 1) - testUtils.sendMessages(topic, Array(1).map(_.toString)) + testUtils.createTopic(topic, partitions = 64) - val kafka = spark + val reader = spark .readStream .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) - .option("kafka.metadata.max.age.ms", "1") - .option("startingOffsets", s"earliest") .option("subscribe", topic) - .load() - - val windowedAggregation = kafka - .withWatermark("timestamp", "10 seconds") - .groupBy(window($"timestamp", "5 seconds") as 'window) - .agg(count("*") as 'count) - .select($"window".getField("start") as 'window, $"count") - val query = windowedAggregation - .writeStream - .format("memory") - .outputMode("complete") - .queryName("kafkaWatermark") - .start() - query.processAllAvailable() - val rows = spark.table("kafkaWatermark").collect() - assert(rows.length === 1, s"Unexpected results: ${rows.toList}") - val row = rows(0) - // We cannot check the exact window start time as it depands on the time that messages were - // inserted by the producer. So here we just use a low bound to make sure the internal - // conversion works. - assert( - row.getAs[java.sql.Timestamp]("window").getTime >= now - 5 * 1000, - s"Unexpected results: $row") - assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row") - query.stop() + testStream(reader.load)( + makeSureGetOffsetCalled, + StopStream, + StartStream(), + StopStream) } -} -class KafkaSourceSuiteBase extends KafkaSourceTest { + test("maxOffsetsPerTrigger") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 3) + testUtils.sendMessages(topic, (100 to 200).map(_.toString).toArray, Some(0)) + testUtils.sendMessages(topic, (10 to 20).map(_.toString).toArray, Some(1)) + testUtils.sendMessages(topic, Array("1"), Some(2)) - import testImplicits._ + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("maxOffsetsPerTrigger", 10) + .option("subscribe", topic) + .option("startingOffsets", "earliest") + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped: org.apache.spark.sql.Dataset[_] = kafka.map(kv => kv._2.toInt) + + val clock = new StreamManualClock + + val waitUntilBatchProcessed = AssertOnQuery { q => + eventually(Timeout(streamingTimeout)) { + if (!q.exception.isDefined) { + assert(clock.isStreamWaitingAt(clock.getTimeMillis())) + } + } + if (q.exception.isDefined) { + throw q.exception.get + } + true + } + + testStream(mapped)( + StartStream(ProcessingTime(100), clock), + waitUntilBatchProcessed, + // 1 from smallest, 1 from middle, 8 from biggest + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107), + AdvanceManualClock(100), + waitUntilBatchProcessed, + // smallest now empty, 1 more from middle, 9 more from biggest + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, + 11, 108, 109, 110, 111, 112, 113, 114, 115, 116 + ), + StopStream, + StartStream(ProcessingTime(100), clock), + waitUntilBatchProcessed, + // smallest now empty, 1 more from middle, 9 more from biggest + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, + 11, 108, 109, 110, 111, 112, 113, 114, 115, 116, + 12, 117, 118, 119, 120, 121, 122, 123, 124, 125 + ), + AdvanceManualClock(100), + waitUntilBatchProcessed, + // smallest now empty, 1 more from middle, 9 more from biggest + CheckAnswer(1, 10, 100, 101, 102, 103, 104, 105, 106, 107, + 11, 108, 109, 110, 111, 112, 113, 114, 115, 116, + 12, 117, 118, 119, 120, 121, 122, 123, 124, 125, + 13, 126, 127, 128, 129, 130, 131, 132, 133, 134 + ) + ) + } test("cannot stop Kafka stream") { val topic = newTopic() @@ -451,7 +328,7 @@ class KafkaSourceSuiteBase extends KafkaSourceTest { .format("kafka") .option("kafka.bootstrap.servers", testUtils.brokerAddress) .option("kafka.metadata.max.age.ms", "1") - .option("subscribePattern", s"$topic.*") + .option("subscribePattern", s"topic-.*") val kafka = reader.load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") @@ -545,6 +422,65 @@ class KafkaSourceSuiteBase extends KafkaSourceTest { } } + test("subscribing topic by pattern with topic deletions") { + val topicPrefix = newTopic() + val topic = topicPrefix + "-seems" + val topic2 = topicPrefix + "-bad" + testUtils.createTopic(topic, partitions = 5) + testUtils.sendMessages(topic, Array("-1")) + require(testUtils.getLatestOffsets(Set(topic)).size === 5) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("subscribePattern", s"$topicPrefix-.*") + .option("failOnDataLoss", "false") + + val kafka = reader.load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + val mapped = kafka.map(kv => kv._2.toInt + 1) + + testStream(mapped)( + makeSureGetOffsetCalled, + AddKafkaData(Set(topic), 1, 2, 3), + CheckAnswer(2, 3, 4), + Assert { + testUtils.deleteTopic(topic) + testUtils.createTopic(topic2, partitions = 5) + true + }, + AddKafkaData(Set(topic2), 4, 5, 6), + CheckAnswer(2, 3, 4, 5, 6, 7) + ) + } + + test("starting offset is latest by default") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 5) + testUtils.sendMessages(topic, Array("0")) + require(testUtils.getLatestOffsets(Set(topic)).size === 5) + + val reader = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("subscribe", topic) + + val kafka = reader.load() + .selectExpr("CAST(value AS STRING)") + .as[String] + val mapped = kafka.map(_.toInt) + + testStream(mapped)( + makeSureGetOffsetCalled, + AddKafkaData(Set(topic), 1, 2, 3), + CheckAnswer(1, 2, 3) // should not have 0 + ) + } + test("bad source options") { def testBadOptions(options: (String, String)*)(expectedMsgs: String*): Unit = { val ex = intercept[IllegalArgumentException] { @@ -604,6 +540,34 @@ class KafkaSourceSuiteBase extends KafkaSourceTest { testUnsupportedConfig("kafka.auto.offset.reset", "latest") } + test("input row metrics") { + val topic = newTopic() + testUtils.createTopic(topic, partitions = 5) + testUtils.sendMessages(topic, Array("-1")) + require(testUtils.getLatestOffsets(Set(topic)).size === 5) + + val kafka = spark + .readStream + .format("kafka") + .option("subscribe", topic) + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .load() + .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") + .as[(String, String)] + + val mapped = kafka.map(kv => kv._2.toInt + 1) + testStream(mapped)( + StartStream(trigger = ProcessingTime(1)), + makeSureGetOffsetCalled, + AddKafkaData(Set(topic), 1, 2, 3), + CheckAnswer(2, 3, 4), + AssertOnQuery { query => + val recordsRead = query.recentProgress.map(_.numInputRows).sum + recordsRead == 3 + } + ) + } + test("delete a topic when a Spark job is running") { KafkaSourceSuite.collectedData.clear() @@ -665,6 +629,8 @@ class KafkaSourceSuiteBase extends KafkaSourceTest { } } + private def newTopic(): String = s"topic-${topicId.getAndIncrement()}" + private def assignString(topic: String, partitions: Iterable[Int]): String = { JsonUtils.partitions(partitions.map(p => new TopicPartition(topic, p))) } @@ -710,10 +676,6 @@ class KafkaSourceSuiteBase extends KafkaSourceTest { testStream(mapped)( makeSureGetOffsetCalled, - Execute { q => - // wait to reach the last offset in every partition - q.awaitOffset(0, KafkaSourceOffset(partitionOffsets.mapValues(_ => 3L))) - }, CheckAnswer(-20, -21, -22, 0, 1, 2, 11, 12, 22), StopStream, StartStream(), @@ -744,7 +706,6 @@ class KafkaSourceSuiteBase extends KafkaSourceTest { .format("memory") .outputMode("append") .queryName("kafkaColumnTypes") - .trigger(defaultTrigger) .start() query.processAllAvailable() val rows = spark.table("kafkaColumnTypes").collect() @@ -762,6 +723,47 @@ class KafkaSourceSuiteBase extends KafkaSourceTest { query.stop() } + test("KafkaSource with watermark") { + val now = System.currentTimeMillis() + val topic = newTopic() + testUtils.createTopic(newTopic(), partitions = 1) + testUtils.sendMessages(topic, Array(1).map(_.toString)) + + val kafka = spark + .readStream + .format("kafka") + .option("kafka.bootstrap.servers", testUtils.brokerAddress) + .option("kafka.metadata.max.age.ms", "1") + .option("startingOffsets", s"earliest") + .option("subscribe", topic) + .load() + + val windowedAggregation = kafka + .withWatermark("timestamp", "10 seconds") + .groupBy(window($"timestamp", "5 seconds") as 'window) + .agg(count("*") as 'count) + .select($"window".getField("start") as 'window, $"count") + + val query = windowedAggregation + .writeStream + .format("memory") + .outputMode("complete") + .queryName("kafkaWatermark") + .start() + query.processAllAvailable() + val rows = spark.table("kafkaWatermark").collect() + assert(rows.length === 1, s"Unexpected results: ${rows.toList}") + val row = rows(0) + // We cannot check the exact window start time as it depands on the time that messages were + // inserted by the producer. So here we just use a low bound to make sure the internal + // conversion works. + assert( + row.getAs[java.sql.Timestamp]("window").getTime >= now - 5 * 1000, + s"Unexpected results: $row") + assert(row.getAs[Int]("count") === 1, s"Unexpected results: $row") + query.stop() + } + private def testFromLatestOffsets( topic: String, addPartitions: Boolean, @@ -798,7 +800,9 @@ class KafkaSourceSuiteBase extends KafkaSourceTest { AddKafkaData(Set(topic), 7, 8), CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9), AssertOnQuery("Add partitions") { query: StreamExecution => - if (addPartitions) setTopicPartitions(topic, 10, query) + if (addPartitions) { + testUtils.addPartitions(topic, 10) + } true }, AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16), @@ -839,7 +843,9 @@ class KafkaSourceSuiteBase extends KafkaSourceTest { StartStream(), CheckAnswer(2, 3, 4, 5, 6, 7, 8, 9), AssertOnQuery("Add partitions") { query: StreamExecution => - if (addPartitions) setTopicPartitions(topic, 10, query) + if (addPartitions) { + testUtils.addPartitions(topic, 10) + } true }, AddKafkaData(Set(topic), 9, 10, 11, 12, 13, 14, 15, 16), @@ -971,23 +977,6 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared } } - protected def startStream(ds: Dataset[Int]) = { - ds.writeStream.foreach(new ForeachWriter[Int] { - - override def open(partitionId: Long, version: Long): Boolean = { - true - } - - override def process(value: Int): Unit = { - // Slow down the processing speed so that messages may be aged out. - Thread.sleep(Random.nextInt(500)) - } - - override def close(errorOrNull: Throwable): Unit = { - } - }).start() - } - test("stress test for failOnDataLoss=false") { val reader = spark .readStream @@ -1001,7 +990,20 @@ class KafkaSourceStressForDontFailOnDataLossSuite extends StreamTest with Shared val kafka = reader.load() .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") .as[(String, String)] - val query = startStream(kafka.map(kv => kv._2.toInt)) + val query = kafka.map(kv => kv._2.toInt).writeStream.foreach(new ForeachWriter[Int] { + + override def open(partitionId: Long, version: Long): Boolean = { + true + } + + override def process(value: Int): Unit = { + // Slow down the processing speed so that messages may be aged out. + Thread.sleep(Random.nextInt(500)) + } + + override def close(errorOrNull: Throwable): Unit = { + } + }).start() val testTime = 1.minutes val startTime = System.currentTimeMillis() http://git-wip-us.apache.org/repos/asf/spark/blob/55dbfbca/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala index b714a46..e8d683a 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameReader.scala @@ -191,9 +191,6 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { ds = ds.asInstanceOf[DataSourceV2], conf = sparkSession.sessionState.conf)).asJava) - // Streaming also uses the data source V2 API. So it may be that the data source implements - // v2, but has no v2 implementation for batch reads. In that case, we fall back to loading - // the dataframe as a v1 source. val reader = (ds, userSpecifiedSchema) match { case (ds: ReadSupportWithSchema, Some(schema)) => ds.createReader(schema, options) @@ -211,30 +208,23 @@ class DataFrameReader private[sql](sparkSession: SparkSession) extends Logging { } reader - case _ => null // fall back to v1 + case _ => + throw new AnalysisException(s"$cls does not support data reading.") } - if (reader == null) { - loadV1Source(paths: _*) - } else { - Dataset.ofRows(sparkSession, DataSourceV2Relation(reader)) - } + Dataset.ofRows(sparkSession, DataSourceV2Relation(reader)) } else { - loadV1Source(paths: _*) + // Code path for data source v1. + sparkSession.baseRelationToDataFrame( + DataSource.apply( + sparkSession, + paths = paths, + userSpecifiedSchema = userSpecifiedSchema, + className = source, + options = extraOptions.toMap).resolveRelation()) } } - private def loadV1Source(paths: String*) = { - // Code path for data source v1. - sparkSession.baseRelationToDataFrame( - DataSource.apply( - sparkSession, - paths = paths, - userSpecifiedSchema = userSpecifiedSchema, - className = source, - options = extraOptions.toMap).resolveRelation()) - } - /** * Construct a `DataFrame` representing the database table accessible via JDBC URL * url named table and connection properties. http://git-wip-us.apache.org/repos/asf/spark/blob/55dbfbca/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala index 97f12ff..3304f36 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/DataFrameWriter.scala @@ -255,24 +255,17 @@ final class DataFrameWriter[T] private[sql](ds: Dataset[T]) { } } - // Streaming also uses the data source V2 API. So it may be that the data source implements - // v2, but has no v2 implementation for batch writes. In that case, we fall back to saving - // as though it's a V1 source. - case _ => saveToV1Source() + case _ => throw new AnalysisException(s"$cls does not support data writing.") } } else { - saveToV1Source() - } - } - - private def saveToV1Source(): Unit = { - // Code path for data source v1. - runCommand(df.sparkSession, "save") { - DataSource( - sparkSession = df.sparkSession, - className = source, - partitionColumns = partitioningColumns.getOrElse(Nil), - options = extraOptions.toMap).planForWriting(mode, AnalysisBarrier(df.logicalPlan)) + // Code path for data source v1. + runCommand(df.sparkSession, "save") { + DataSource( + sparkSession = df.sparkSession, + className = source, + partitionColumns = partitioningColumns.getOrElse(Nil), + options = extraOptions.toMap).planForWriting(mode, AnalysisBarrier(df.logicalPlan)) + } } } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org For additional commands, e-mail: commits-h...@spark.apache.org