Memory leak while caching in foreachBatch block
Hi, We have a structured streaming application, and we face a memory leak while caching in the foreachBatch block. We do unpersist every iteration, and we also verify via "spark.sparkContext.getPersistentRDDs" that we don't have unnecessary cached data. We also noted in the profiler that many sparkSession objects are created while we use cache (vs 2 sparkSession objects while not using cache). Any idea what can cause this? We use spark 3.1.1
Partial data with ADLS Gen2
I have spark batch application writing to ADLS Gen2 (hierarchy). When designing the application I was sure the spark would perform global commit once the job is committed, but what it really does it commits on each task, meaning *once task completes writing it moves from temp to target storage*. So if the batch fails we have partial data, and on retry we are getting data duplications. Our scale is really huge so rolling back (deleting data) is not an option for us, the search will takes a lot of time. Is there any "build in" solution, something we can use out of the box? Thanks.
Stopping streaming after the write commit and before the read commit?
Hi, What is the expected behavior if the streaming is stopped after the write commit and before the read commit? Should I expect data duplication? Thanks.
Why planInputPartitions is called multiple times in a micro-batch?
Hi, I'm developing a new Spark connector using data source v2 API (spark 3.1.1). I noticed that the planInputPartitions method (in MicroBatchStream) is called twice every micro-batch. What the motivation/reason is? Thanks, Kineret
How to implement "getPreferredLocations" in Data source v2?
Hi, I would like to support data locality in Spark data source v2. How can I provide Spark the ability to read and process data on the same node? I didn't find any interface that supports 'getPreferredLocations' (or equivalent). Thanks!
'No plan for EventTimeWatermark' error while using structured streaming with column pruning (spark 2.3.1)
Hi All, I get 'No plan for EventTimeWatermark' error while doing a query with columns pruning using structured streaming with a custom data source that implements Spark datasource v2. My data source implementation that handles the schemas includes the following: class MyDataSourceReader extends DataSourceReader with SupportsPushDownRequiredColumns { var schema: StructType = createSchema() override def readSchema(): StructType = schema override def pruneColumns(requiredSchema: StructType) = { this.schema = requiredSchema } and then: class MyDataSourceReaderStream extends MyDataSourceReader { ... This is my test code: def x(): Unit = { val df1 = sparkSession.readStream.format(myV2Source).load() val df2 = df1 .withColumn("epoch", (round(col("epoch") / (30 * 1000)) * 30).cast(TimestampType)) .withWatermark("epoch", "1 milliseconds") .groupBy(col("epoch"), col("id")).count() val streamingQuery = df2 .writeStream .format("console") .trigger(Trigger.ProcessingTime("10 seconds")) .outputMode(OutputMode.Append()) .start() streamingQuery.awaitTermination() } I get the following exception: Caused by: java.lang.AssertionError: assertion failed: No plan for EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds +- Project [cast((round((cast(epoch#320L as double) / 3.0), 0) * 30.0) as timestamp) AS epoch#201, id#367L] +- DataSourceV2Relation [epoch#320L, id#367L], com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c at scala.Predef$.assert(Predef.scala:170) at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78) at org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157) at scala.collection.Iterator$class.foreach(Iterator.scala:893) at scala.collection.AbstractIterator.foreach(Iterator.scala:1336) Note that in the logical plan I got *DataSourceV2Relation* and not *StreamingDataSourceV2Relation*although I use streaming. Would appreciate the help.
How to support writeStream in data source v2 (spark 2.3.1)?
I write spark data source v2 in spark 2.3 and I want to support writeStream. What should I do in order to do so? my defaultSource class: class MyDefaultSource extends DataSourceV2 with ReadSupport with WriteSupport with MicroBatchReadSupport { .. Which interface is missing?
Spark streaming error - Query terminated with exception: assertion failed: Invalid batch: a#660,b#661L,c#662,d#663,,… 26 more fields != b#1291L
I try to read a stream using my custom data source (v2, using spark 2.3), and it fails *in the second iteration* with the following exception while reading prune columns:Query [id=xxx, runId=yyy] terminated with exception: assertion failed: Invalid batch: a#660,b#661L,c#662,d#663,,... 26 more fields != b#1291L Datafream creation: val df = sparkSession.readStream.format("myV2Source").load("/") val df1 = df.filter(df("a") >= "-1").select("b") Stream execution: val streamingQuery = df1 .writeStream .format("console") .trigger(Trigger.ProcessingTime("10 seconds")) .outputMode(OutputMode.Append()) .start() streamingQuery.awaitTermination() if I remove the select (i.e. val df1 = df.filter(df("a") >= "-1")), it works fine. Any idea why?
Spark Streaming: schema mismatch using MicroBatchReader with columns pruning
I have the same problem as described in the following question in StackOverflow (but nobody has answered to it). https://stackoverflow.com/questions/51103634/spark-streaming-schema-mismatch-using-microbatchreader-with-columns-pruning Any idea of how to solve it (using Spark 2.3)? Thanks, Kineret