[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...
Github user vackosar commented on a diff in the pull request: https://github.com/apache/spark/pull/22143#discussion_r211381706 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala --- @@ -116,3 +133,66 @@ class KafkaStreamDataWriter( } } } + +private[kafka010] case class KafkaWriterCustomMetrics( +minOffset: KafkaSourceOffset, +maxOffset: KafkaSourceOffset) extends CustomMetrics { + override def json(): String = { +val jsonVal = ("minOffset" -> parse(minOffset.json)) ~ + ("maxOffset" -> parse(maxOffset.json)) +compact(render(jsonVal)) + } + + override def toString: String = json() +} + +private[kafka010] object KafkaWriterCustomMetrics { + + import Math.{min, max} + + def apply(messages: Array[WriterCommitMessage]): KafkaWriterCustomMetrics = { +val minMax = collate(messages) +KafkaWriterCustomMetrics(minMax._1, minMax._2) + } + + private def collate(messages: Array[WriterCommitMessage]): --- End diff -- Thanks, I will rename to something with minMax. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...
Github user vackosar commented on a diff in the pull request: https://github.com/apache/spark/pull/22143#discussion_r211379697 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala --- @@ -19,18 +19,23 @@ package org.apache.spark.sql.kafka010 import scala.collection.JavaConverters._ +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery +import org.apache.spark.sql.sources.v2.CustomMetrics import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamWriter, SupportsCustomWriterMetrics} import org.apache.spark.sql.types.StructType /** * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we * don't need to really send one. */ -case object KafkaWriterCommitMessage extends WriterCommitMessage +case class KafkaWriterCommitMessage(minOffset: KafkaSourceOffset, maxOffset: KafkaSourceOffset) --- End diff -- I would have to rename the class itself to not add additional duplicate class. I would love to do that, it is just that I am not sure if it would be accepted. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...
Github user vackosar commented on a diff in the pull request: https://github.com/apache/spark/pull/22143#discussion_r211379432 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala --- @@ -61,12 +63,30 @@ private[kafka010] class KafkaWriteTask( private[kafka010] abstract class KafkaRowWriter( inputSchema: Seq[Attribute], topic: Option[String]) { + import scala.collection.JavaConverters._ + + protected val minOffsetAccumulator: collection.concurrent.Map[TopicPartition, Long] = +new ConcurrentHashMap[TopicPartition, Long]().asScala --- End diff -- This map is accessed in callbacks concurrently with respect to different partitions. Can be seen from call hierarchy and docs of Kafka's send method. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/22143#discussion_r211339535 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaWriteTask.scala --- @@ -61,12 +63,30 @@ private[kafka010] class KafkaWriteTask( private[kafka010] abstract class KafkaRowWriter( inputSchema: Seq[Attribute], topic: Option[String]) { + import scala.collection.JavaConverters._ + + protected val minOffsetAccumulator: collection.concurrent.Map[TopicPartition, Long] = +new ConcurrentHashMap[TopicPartition, Long]().asScala --- End diff -- why is this concurrent map? --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/22143#discussion_r211336988 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala --- @@ -116,3 +133,66 @@ class KafkaStreamDataWriter( } } } + +private[kafka010] case class KafkaWriterCustomMetrics( +minOffset: KafkaSourceOffset, +maxOffset: KafkaSourceOffset) extends CustomMetrics { + override def json(): String = { +val jsonVal = ("minOffset" -> parse(minOffset.json)) ~ + ("maxOffset" -> parse(maxOffset.json)) +compact(render(jsonVal)) + } + + override def toString: String = json() +} + +private[kafka010] object KafkaWriterCustomMetrics { + + import Math.{min, max} + + def apply(messages: Array[WriterCommitMessage]): KafkaWriterCustomMetrics = { +val minMax = collate(messages) +KafkaWriterCustomMetrics(minMax._1, minMax._2) + } + + private def collate(messages: Array[WriterCommitMessage]): --- End diff -- good to leave some comment on what this does. It seems to be computing the min/max offset per partition? If so choosing an apt name for that function would make it clearer. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...
Github user arunmahadevan commented on a diff in the pull request: https://github.com/apache/spark/pull/22143#discussion_r211336368 --- Diff: external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaStreamWriter.scala --- @@ -19,18 +19,23 @@ package org.apache.spark.sql.kafka010 import scala.collection.JavaConverters._ +import org.json4s.JsonDSL._ +import org.json4s.jackson.JsonMethods._ + import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.expressions.Attribute import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery +import org.apache.spark.sql.sources.v2.CustomMetrics import org.apache.spark.sql.sources.v2.writer._ -import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter +import org.apache.spark.sql.sources.v2.writer.streaming.{StreamWriter, SupportsCustomWriterMetrics} import org.apache.spark.sql.types.StructType /** * Dummy commit message. The DataSourceV2 framework requires a commit message implementation but we * don't need to really send one. */ -case object KafkaWriterCommitMessage extends WriterCommitMessage +case class KafkaWriterCommitMessage(minOffset: KafkaSourceOffset, maxOffset: KafkaSourceOffset) --- End diff -- Its kind of odd that the writer commit message includes source offset. IMO, better to define a `KafkaSinkOffset` or if it can be common, something like `KafkaOffsets`. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org
[GitHub] spark pull request #22143: [SPARK-24647][SS] Report KafkaStreamWriter's writ...
GitHub user vackosar opened a pull request: https://github.com/apache/spark/pull/22143 [SPARK-24647][SS] Report KafkaStreamWriter's written min and max offs⦠â¦ets via CustomMetrics. ## What changes were proposed in this pull request? Report KafkaStreamWriter's written min and max offsets via CustomMetrics. This is important for data lineage projects like Spline. Related issue: https://issues.apache.org/jira/browse/SPARK-24647 ## How was this patch tested? Unit tests. Please review http://spark.apache.org/contributing.html before opening a pull request. You can merge this pull request into a Git repository by running: $ git pull https://github.com/AbsaOSS/spark feature/SPARK-24647-kafka-writer-offsets Alternatively you can review and apply these changes as the patch at: https://github.com/apache/spark/pull/22143.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #22143 commit 767a0156a1acef515974d48bbcb6cfdb17f68d90 Author: Kosar, Vaclav: Functions Transformation Date: 2018-08-17T13:31:57Z [SPARK-24647][SS] Report KafkaStreamWriter's written min and max offsets via CustomMetrics. --- - To unsubscribe, e-mail: reviews-unsubscr...@spark.apache.org For additional commands, e-mail: reviews-h...@spark.apache.org