I thought about it. I am not 100% sure whether this fix should go into 2.3.1.
There are two parts to this bug fix to enable self-joins. 1. Enabling deduping of leaf logical nodes by extending MultInstanceRelation - This is safe to be backported into the 2.3 branch as it does not touch production code paths. 2. Fixing attribute rewriting in MicroBatchExecution, when the micro-batch plan is spliced into the streaming plan. - This touches core production code paths and therefore, may not safe to backport. Part 1 enables self-joins in all but a small fraction of self-join queries. That small fraction can produce incorrect results, and part 2 avoids that. So for 2.3.1, we can enable self-joins by merging only part 1, but it can give wrong results in some cases. I think that is strictly worse than no fix. TD On Thu, Feb 22, 2018 at 2:32 PM, kant kodali <kanth...@gmail.com> wrote: > Hi TD, > > I pulled your commit that is listed on this ticket https://issues.apache. > org/jira/browse/SPARK-23406 specifically I did the following steps and > self joins work after I cherry-pick your commit! Good Job! I was hoping it > will be part of 2.3.0 but looks like it is targeted for 2.3.1 :( > > git clone https://github.com/apache/spark.gitcd spark > git fetch > git checkout branch-2.3 > git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959 > export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m" > ./build/mvn -DskipTests compile > ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr > -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn > > > On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das < > tathagata.das1...@gmail.com> wrote: > >> Hey, >> >> Thanks for testing out stream-stream joins and reporting this issue. I am >> going to take a look at this. >> >> TD >> >> >> >> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <kanth...@gmail.com> wrote: >> >>> if I change it to the below code it works. However, I don't believe it >>> is the solution I am looking for. I want to be able to do it in raw SQL and >>> moreover, If a user gives a big chained raw spark SQL join query I am not >>> even sure how to make copies of the dataframe to achieve the self-join. Is >>> there any other way here? >>> >>> >>> >>> import org.apache.spark.sql.streaming.Trigger >>> >>> val jdf = >>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", >>> "localhost:9092").option("subscribe", >>> "join_test").option("startingOffsets", "earliest").load(); >>> val jdf1 = >>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", >>> "localhost:9092").option("subscribe", >>> "join_test").option("startingOffsets", "earliest").load(); >>> >>> jdf.createOrReplaceTempView("table") >>> jdf1.createOrReplaceTempView("table") >>> >>> val resultdf = spark.sql("select * from table inner join table1 on >>> table.offset=table1.offset") >>> >>> resultdf.writeStream.outputMode("append").format("console").option("truncate", >>> false).trigger(Trigger.ProcessingTime(1000)).start() >>> >>> >>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <kanth...@gmail.com> wrote: >>> >>>> If I change it to this >>>> >>>> >>>> >>>> >>>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <kanth...@gmail.com> >>>> wrote: >>>> >>>>> Hi All, >>>>> >>>>> I have the following code >>>>> >>>>> import org.apache.spark.sql.streaming.Trigger >>>>> >>>>> val jdf = >>>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", >>>>> "localhost:9092").option("subscribe", >>>>> "join_test").option("startingOffsets", "earliest").load(); >>>>> >>>>> jdf.createOrReplaceTempView("table") >>>>> >>>>> val resultdf = spark.sql("select * from table as x inner join table as y >>>>> on x.offset=y.offset") >>>>> >>>>> resultdf.writeStream.outputMode("update").format("console").option("truncate", >>>>> false).trigger(Trigger.ProcessingTime(1000)).start() >>>>> >>>>> and I get the following exception. >>>>> >>>>> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given >>>>> input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, >>>>> x.timestamp, x.partition]; line 1 pos 50; >>>>> 'Project [*] >>>>> +- 'Join Inner, ('x.offset = 'y.offset) >>>>> :- SubqueryAlias x >>>>> : +- SubqueryAlias table >>>>> : +- StreamingRelation >>>>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets >>>>> -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> >>>>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, >>>>> offset#32L, timestamp#33, timestampType#34] >>>>> +- SubqueryAlias y >>>>> +- SubqueryAlias table >>>>> +- StreamingRelation >>>>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets >>>>> -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> >>>>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, >>>>> offset#32L, timestamp#33, timestampType#34] >>>>> >>>>> any idea whats wrong here? >>>>> >>>>> Thanks! >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>>> >>>> >>> >> >