This doc is unrelated to the stream-stream join we added in Structured Streaming. :)
That said we added append mode first because it easier to reason about the semantics of append mode especially in the context of outer joins. You output a row only when you know it wont be changed ever. The semantics of update mode in outer joins is trickier to reason about and expose through the APIs. Consider a left outer join. As soon as we get a left-side record with a key K that does not have a match, do we output *(K, leftValue, null)*? And if we do so, then later get 2 matches from the right side, we have to output *(K, leftValue, rightValue1) and (K, leftValue, rightValue2)*. But how do we convey that *rightValue1* and *rightValue2 *together replace the earlier *null*, rather than *rightValue2* replacing *rightValue1* replacing *null?* We will figure these out in future releases. For now, we have released append mode, which allow quite a large range of use cases, including multiple cascading joins. TD On Thu, Mar 8, 2018 at 9:18 AM, Gourav Sengupta <gourav.sengu...@gmail.com> wrote: > super interesting. > > On Wed, Mar 7, 2018 at 11:44 AM, kant kodali <kanth...@gmail.com> wrote: > >> It looks to me that the StateStore described in this doc >> <https://docs.google.com/document/d/1i528WI7KFica0Dg1LTQfdQMsW8ai3WDvHmUvkH1BKg4/edit> >> Actually >> has full outer join and every other join is a filter of that. Also the doc >> talks about update mode but looks like Spark 2.3 ended up with append mode? >> Anyways the moment it is in master I am ready to test so JIRA tickets on >> this would help to keep track. please let me know. >> >> Thanks! >> >> On Tue, Mar 6, 2018 at 9:16 PM, kant kodali <kanth...@gmail.com> wrote: >> >>> Sorry I meant Spark 2.4 in my previous email >>> >>> On Tue, Mar 6, 2018 at 9:15 PM, kant kodali <kanth...@gmail.com> wrote: >>> >>>> Hi TD, >>>> >>>> I agree I think we are better off either with a full fix or no fix. I >>>> am ok with the complete fix being available in master or some branch. I >>>> guess the solution for me is to just build from the source. >>>> >>>> On a similar note, I am not finding any JIRA tickets related to full >>>> outer joins and update mode for maybe say Spark 2.3. I wonder how hard is >>>> it two implement both of these? It turns out the update mode and full outer >>>> join is very useful and required in my case, therefore, I'm just asking. >>>> >>>> Thanks! >>>> >>>> On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das < >>>> tathagata.das1...@gmail.com> wrote: >>>> >>>>> I thought about it. >>>>> I am not 100% sure whether this fix should go into 2.3.1. >>>>> >>>>> There are two parts to this bug fix to enable self-joins. >>>>> >>>>> 1. Enabling deduping of leaf logical nodes by extending >>>>> MultInstanceRelation >>>>> - This is safe to be backported into the 2.3 branch as it does not >>>>> touch production code paths. >>>>> >>>>> 2. Fixing attribute rewriting in MicroBatchExecution, when the >>>>> micro-batch plan is spliced into the streaming plan. >>>>> - This touches core production code paths and therefore, may not >>>>> safe to backport. >>>>> >>>>> Part 1 enables self-joins in all but a small fraction of self-join >>>>> queries. That small fraction can produce incorrect results, and part 2 >>>>> avoids that. >>>>> >>>>> So for 2.3.1, we can enable self-joins by merging only part 1, but it >>>>> can give wrong results in some cases. I think that is strictly worse than >>>>> no fix. >>>>> >>>>> TD >>>>> >>>>> >>>>> >>>>> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali <kanth...@gmail.com> >>>>> wrote: >>>>> >>>>>> Hi TD, >>>>>> >>>>>> I pulled your commit that is listed on this ticket >>>>>> https://issues.apache.org/jira/browse/SPARK-23406 specifically I did >>>>>> the following steps and self joins work after I cherry-pick your commit! >>>>>> Good Job! I was hoping it will be part of 2.3.0 but looks like it is >>>>>> targeted for 2.3.1 :( >>>>>> >>>>>> git clone https://github.com/apache/spark.gitcd spark >>>>>> git fetch >>>>>> git checkout branch-2.3 >>>>>> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959 >>>>>> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m" >>>>>> ./build/mvn -DskipTests compile >>>>>> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr >>>>>> -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn >>>>>> >>>>>> >>>>>> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das < >>>>>> tathagata.das1...@gmail.com> wrote: >>>>>> >>>>>>> Hey, >>>>>>> >>>>>>> Thanks for testing out stream-stream joins and reporting this issue. >>>>>>> I am going to take a look at this. >>>>>>> >>>>>>> TD >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <kanth...@gmail.com> >>>>>>> wrote: >>>>>>> >>>>>>>> if I change it to the below code it works. However, I don't believe >>>>>>>> it is the solution I am looking for. I want to be able to do it in raw >>>>>>>> SQL and moreover, If a user gives a big chained raw spark SQL join >>>>>>>> query I >>>>>>>> am not even sure how to make copies of the dataframe to achieve the >>>>>>>> self-join. Is there any other way here? >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> import org.apache.spark.sql.streaming.Trigger >>>>>>>> >>>>>>>> val jdf = >>>>>>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", >>>>>>>> "localhost:9092").option("subscribe", >>>>>>>> "join_test").option("startingOffsets", "earliest").load(); >>>>>>>> val jdf1 = >>>>>>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", >>>>>>>> "localhost:9092").option("subscribe", >>>>>>>> "join_test").option("startingOffsets", "earliest").load(); >>>>>>>> >>>>>>>> jdf.createOrReplaceTempView("table") >>>>>>>> jdf1.createOrReplaceTempView("table") >>>>>>>> >>>>>>>> val resultdf = spark.sql("select * from table inner join table1 on >>>>>>>> table.offset=table1.offset") >>>>>>>> >>>>>>>> resultdf.writeStream.outputMode("append").format("console").option("truncate", >>>>>>>> false).trigger(Trigger.ProcessingTime(1000)).start() >>>>>>>> >>>>>>>> >>>>>>>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <kanth...@gmail.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> If I change it to this >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <kanth...@gmail.com> >>>>>>>>> wrote: >>>>>>>>> >>>>>>>>>> Hi All, >>>>>>>>>> >>>>>>>>>> I have the following code >>>>>>>>>> >>>>>>>>>> import org.apache.spark.sql.streaming.Trigger >>>>>>>>>> >>>>>>>>>> val jdf = >>>>>>>>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", >>>>>>>>>> "localhost:9092").option("subscribe", >>>>>>>>>> "join_test").option("startingOffsets", "earliest").load(); >>>>>>>>>> >>>>>>>>>> jdf.createOrReplaceTempView("table") >>>>>>>>>> >>>>>>>>>> val resultdf = spark.sql("select * from table as x inner join table >>>>>>>>>> as y on x.offset=y.offset") >>>>>>>>>> >>>>>>>>>> resultdf.writeStream.outputMode("update").format("console").option("truncate", >>>>>>>>>> false).trigger(Trigger.ProcessingTime(1000)).start() >>>>>>>>>> >>>>>>>>>> and I get the following exception. >>>>>>>>>> >>>>>>>>>> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' >>>>>>>>>> given input columns: [x.value, x.offset, x.key, x.timestampType, >>>>>>>>>> x.topic, x.timestamp, x.partition]; line 1 pos 50; >>>>>>>>>> 'Project [*] >>>>>>>>>> +- 'Join Inner, ('x.offset = 'y.offset) >>>>>>>>>> :- SubqueryAlias x >>>>>>>>>> : +- SubqueryAlias table >>>>>>>>>> : +- StreamingRelation >>>>>>>>>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets >>>>>>>>>> -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> >>>>>>>>>> localhost:9092),None), kafka, [key#28, value#29, topic#30, >>>>>>>>>> partition#31, offset#32L, timestamp#33, timestampType#34] >>>>>>>>>> +- SubqueryAlias y >>>>>>>>>> +- SubqueryAlias table >>>>>>>>>> +- StreamingRelation >>>>>>>>>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets >>>>>>>>>> -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> >>>>>>>>>> localhost:9092),None), kafka, [key#28, value#29, topic#30, >>>>>>>>>> partition#31, offset#32L, timestamp#33, timestampType#34] >>>>>>>>>> >>>>>>>>>> any idea whats wrong here? >>>>>>>>>> >>>>>>>>>> Thanks! >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>>> >>>>>>>>> >>>>>>>> >>>>>>> >>>>>> >>>>> >>>> >>> >> >