[ 
https://issues.apache.org/jira/browse/SPARK-30634?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dongjoon Hyun resolved SPARK-30634.
-----------------------------------
    Resolution: Invalid

Hi, [~yurkao]. Sorry, but JIRA is not for Q&A. Please send to dev mailing list 
for the question.

> Delta Merge and Arbitrary Stateful Processing in Structured streaming  
> (foreachBatch)
> -------------------------------------------------------------------------------------
>
>                 Key: SPARK-30634
>                 URL: https://issues.apache.org/jira/browse/SPARK-30634
>             Project: Spark
>          Issue Type: Question
>          Components: Examples, Spark Core, Structured Streaming
>    Affects Versions: 2.4.3
>         Environment: Spark 2.4.3 (scala 2.11.12)
> Delta: 0.5.0
> Java(TM) SE Runtime Environment (build 1.8.0_91-b14)
> OS: Ubuntu 18.04 LTS
>  
>            Reporter: Yurii Oleynikov
>            Priority: Trivial
>         Attachments: Capture1.PNG
>
>
> Hi ,
> I have an application that makes Arbitrary Stateful Processing in Structured 
> Streaming and used delta.merge to update delta table and faced strange 
> behaviour:
> 1. I've noticed that logs inside implementation of 
> {{MapGroupsWithStateFunction}}/ {{FlatMapGroupsWithStateFunction}} in my 
> application outputted twice.
> 2. While finding a root cause I've also found that number State rows reported 
> by Spark is also doubles.
>  
> I thought that may be there's a bug in my code, so I back to 
> {{JavaStructuredSessionization}} from Apache Spark examples and changed it a 
> bit. Still got same result.
> The problem happens only if I do not perform datch.DF.persist inside 
> foreachBatch.
> {code:java}
> StreamingQuery query = sessionUpdates
>         .writeStream()
>         .outputMode("update")
>         .foreachBatch((VoidFunction2<Dataset<SessionUpdate>, Long>) (batchDf, 
> v2) -> {
>             // following doubles number of spark state rows and causes 
> MapGroupsWithStateFunction to log twice withport persisting
>             deltaTable.as("sessions").merge(batchDf.toDF().as("updates"), 
> mergeExpr)
>                     .whenNotMatched().insertAll()
>                     .whenMatched()
>                     .updateAll()
>                     .execute();
>         })
>         .trigger(Trigger.ProcessingTime(10000))
>         .queryName("ACME")
>         .start(); 
> {code}
> According to 
> [https://docs.databricks.com/_static/notebooks/merge-in-streaming.html] and 
> [Apache spark 
> docs|https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#using-foreach-and-foreachbatch]
>  there's seems to be no need to persist dataset/dataframe inside 
> {{foreachBatch.}}
> Sample code from Apache Spark examples with delta: 
> [JavaStructuredSessionization with Delta 
> merge|https://github.com/yurkao/delta-merge-sss/blob/master/src/main/java/JavaStructuredSessionization.java]
>  
>  
> Appreciate your clarification.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to