I thought about it.
I am not 100% sure whether this fix should go into 2.3.1.

There are two parts to this bug fix to enable self-joins.

1. Enabling deduping of leaf logical nodes by extending
MultInstanceRelation
  - This is safe to be backported into the 2.3 branch as it does not touch
production code paths.

2. Fixing attribute rewriting in MicroBatchExecution, when the micro-batch
plan is spliced into the streaming plan.
  - This touches core production code paths and therefore, may not safe to
backport.

Part 1 enables self-joins in all but a small fraction of self-join queries.
That small fraction can produce incorrect results, and part 2 avoids that.

So for 2.3.1, we can enable self-joins by merging only part 1, but it can
give wrong results in some cases. I think that is strictly worse than no
fix.

TD



On Thu, Feb 22, 2018 at 2:32 PM, kant kodali <kanth...@gmail.com> wrote:

> Hi TD,
>
> I pulled your commit that is listed on this ticket https://issues.apache.
> org/jira/browse/SPARK-23406 specifically I did the following steps and
> self joins work after I cherry-pick your commit! Good Job! I was hoping it
> will be part of 2.3.0 but looks like it is targeted for 2.3.1 :(
>
> git clone https://github.com/apache/spark.gitcd spark
> git fetch
> git checkout branch-2.3
> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
> ./build/mvn -DskipTests compile
> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr 
> -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn
>
>
> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Hey,
>>
>> Thanks for testing out stream-stream joins and reporting this issue. I am
>> going to take a look at this.
>>
>> TD
>>
>>
>>
>> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali <kanth...@gmail.com> wrote:
>>
>>> if I change it to the below code it works. However, I don't believe it
>>> is the solution I am looking for. I want to be able to do it in raw SQL and
>>> moreover, If a user gives a big chained raw spark SQL join query I am not
>>> even sure how to make copies of the dataframe to achieve the self-join. Is
>>> there any other way here?
>>>
>>>
>>>
>>> import org.apache.spark.sql.streaming.Trigger
>>>
>>> val jdf = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>> "localhost:9092").option("subscribe", 
>>> "join_test").option("startingOffsets", "earliest").load();
>>> val jdf1 = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>> "localhost:9092").option("subscribe", 
>>> "join_test").option("startingOffsets", "earliest").load();
>>>
>>> jdf.createOrReplaceTempView("table")
>>> jdf1.createOrReplaceTempView("table")
>>>
>>> val resultdf = spark.sql("select * from table inner join table1 on 
>>> table.offset=table1.offset")
>>>
>>> resultdf.writeStream.outputMode("append").format("console").option("truncate",
>>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>>
>>>
>>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali <kanth...@gmail.com> wrote:
>>>
>>>> If I change it to this
>>>>
>>>>
>>>>
>>>>
>>>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali <kanth...@gmail.com>
>>>> wrote:
>>>>
>>>>> Hi All,
>>>>>
>>>>> I have the following code
>>>>>
>>>>> import org.apache.spark.sql.streaming.Trigger
>>>>>
>>>>> val jdf = 
>>>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>>>> "localhost:9092").option("subscribe", 
>>>>> "join_test").option("startingOffsets", "earliest").load();
>>>>>
>>>>> jdf.createOrReplaceTempView("table")
>>>>>
>>>>> val resultdf = spark.sql("select * from table as x inner join table as y 
>>>>> on x.offset=y.offset")
>>>>>
>>>>> resultdf.writeStream.outputMode("update").format("console").option("truncate",
>>>>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>>>>
>>>>> and I get the following exception.
>>>>>
>>>>> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given 
>>>>> input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, 
>>>>> x.timestamp, x.partition]; line 1 pos 50;
>>>>> 'Project [*]
>>>>> +- 'Join Inner, ('x.offset = 'y.offset)
>>>>>    :- SubqueryAlias x
>>>>>    :  +- SubqueryAlias table
>>>>>    :     +- StreamingRelation 
>>>>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>>>>>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
>>>>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
>>>>> offset#32L, timestamp#33, timestampType#34]
>>>>>    +- SubqueryAlias y
>>>>>       +- SubqueryAlias table
>>>>>          +- StreamingRelation 
>>>>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>>>>>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
>>>>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
>>>>> offset#32L, timestamp#33, timestampType#34]
>>>>>
>>>>> any idea whats wrong here?
>>>>>
>>>>> Thanks!
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>>
>>>>
>>>
>>
>

Reply via email to