Memory leak while caching in foreachBatch block

2022-08-10 Thread kineret M
Hi,

We have a structured streaming application, and we face a memory leak while
caching in the foreachBatch block.

We do unpersist every iteration, and we also verify via
"spark.sparkContext.getPersistentRDDs" that we don't have unnecessary
cached data.

We also noted in the profiler that many sparkSession objects are created
while we use cache (vs 2 sparkSession objects while not using cache).

Any idea what can cause this? We use spark 3.1.1


Partial data with ADLS Gen2

2022-07-24 Thread kineret M
I have spark batch application writing to ADLS Gen2 (hierarchy).
When designing the application I was sure the spark would perform global
commit once the job is committed, but what it really does it commits on
each task, meaning *once task completes writing it moves from temp to
target storage*. So if the batch fails we have partial data, and on retry
we are getting data duplications.
Our scale is really huge so rolling back (deleting data) is not an option
for us, the search will takes a lot of time.
Is there any "build in" solution, something we can use out of the box?

Thanks.


Stopping streaming after the write commit and before the read commit?

2022-05-18 Thread kineret M
Hi,

What is the expected behavior if the streaming is stopped after the write
commit and before the read commit? Should I expect data duplication?

Thanks.


Why planInputPartitions is called multiple times in a micro-batch?

2021-07-12 Thread kineret M
Hi,

I'm developing a new Spark connector using data source v2 API (spark 3.1.1).
I noticed that the planInputPartitions method (in MicroBatchStream) is
called twice every micro-batch.

What the motivation/reason is?

Thanks,
Kineret


How to implement "getPreferredLocations" in Data source v2?

2020-01-18 Thread kineret M
Hi,
I would like to support data locality in Spark data source v2. How can I
provide Spark the ability to read and process data on the same node?

I didn't find any interface that supports 'getPreferredLocations' (or
equivalent).

Thanks!


'No plan for EventTimeWatermark' error while using structured streaming with column pruning (spark 2.3.1)

2019-04-24 Thread kineret M
Hi All,

I get 'No plan for EventTimeWatermark' error while doing a query with
columns pruning using structured streaming with a custom data source that
implements Spark datasource v2.

My data source implementation that handles the schemas includes the
following:

class MyDataSourceReader extends DataSourceReader with
SupportsPushDownRequiredColumns {
var schema: StructType = createSchema()

override def readSchema(): StructType = schema

override def pruneColumns(requiredSchema: StructType) = {
this.schema = requiredSchema
}

and then:

class MyDataSourceReaderStream extends MyDataSourceReader { ...

This is my test code:

def x(): Unit = {
val df1 = sparkSession.readStream.format(myV2Source).load()

val df2 = df1
.withColumn("epoch", (round(col("epoch") / (30 *
1000)) * 30).cast(TimestampType))
.withWatermark("epoch", "1 milliseconds")
.groupBy(col("epoch"), col("id")).count()

val streamingQuery = df2
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode(OutputMode.Append())
.start()

streamingQuery.awaitTermination()
   }

I get the following exception:

Caused by: java.lang.AssertionError: assertion failed: No plan for
EventTimeWatermark epoch#201: timestamp, interval 1 milliseconds
+- Project [cast((round((cast(epoch#320L as double) / 3.0), 0) *
30.0) as timestamp) AS epoch#201, id#367L]
   +- DataSourceV2Relation [epoch#320L, id#367L],
com.akamai.csi.datacatalog.spark.connectors.endor.v2.reader.stream.EndorDataSourceStreamReader@173b1b7c

at scala.Predef$.assert(Predef.scala:170)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:93)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:78)
at 
org.apache.spark.sql.catalyst.planning.QueryPlanner$$anonfun$2$$anonfun$apply$2.apply(QueryPlanner.scala:75)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at 
scala.collection.TraversableOnce$$anonfun$foldLeft$1.apply(TraversableOnce.scala:157)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)

Note that in the logical plan I got *DataSourceV2Relation* and not
*StreamingDataSourceV2Relation*although I use streaming.

Would appreciate the help.


How to support writeStream in data source v2 (spark 2.3.1)?

2019-03-24 Thread kineret M
I write spark data source v2 in spark 2.3 and I want to support
writeStream. What should I do in order to do so?

my defaultSource class:

class MyDefaultSource extends DataSourceV2 with ReadSupport with
WriteSupport with MicroBatchReadSupport { ..

Which interface is missing?


Spark streaming error - Query terminated with exception: assertion failed: Invalid batch: a#660,b#661L,c#662,d#663,,… 26 more fields != b#1291L

2019-03-21 Thread kineret M
I try to read a stream using my custom data source (v2, using spark 2.3),
and it fails *in the second iteration* with the following exception while
reading prune columns:Query [id=xxx, runId=yyy] terminated with exception:
assertion failed: Invalid batch: a#660,b#661L,c#662,d#663,,... 26 more
fields != b#1291L

Datafream creation:

val df = sparkSession.readStream.format("myV2Source").load("/")
val df1 = df.filter(df("a") >= "-1").select("b")

Stream execution:

  val streamingQuery = df1
.writeStream
.format("console")
.trigger(Trigger.ProcessingTime("10 seconds"))
.outputMode(OutputMode.Append())
.start()

streamingQuery.awaitTermination()

if I remove the select (i.e. val df1 = df.filter(df("a") >= "-1")), it
works fine.

Any idea why?


Spark Streaming: schema mismatch using MicroBatchReader with columns pruning

2019-03-16 Thread kineret M
I have the same problem as described in the following question in
StackOverflow (but nobody has answered to it).

https://stackoverflow.com/questions/51103634/spark-streaming-schema-mismatch-using-microbatchreader-with-columns-pruning

Any idea of how to solve it (using Spark 2.3)?

Thanks,
Kineret