Understanding about joins in spark

2022-06-27 Thread Sid
Hi Team,

As per my understanding, assume it to be a large dataset. When we apply
joins, data from different executors are shuffled in such a way that the
same "keys" are landed in one partition.

So, this is done for both the dataframes, right?  For eg: Key A for df1
will be sorted and kept in one partition and Key A for df2 will be sorted
and kept in another partition and then it will be compared and merged?

I know that for shuffle hash join keys for both data frames are merged
under a single partition since the smaller data is copied on each and every
executor.

Also, where would be the join operation performed? on another worker node
or it is performed on the driver side?

Somebody, please help me to understand this by correcting me w.r.t my
points or just adding an explanation to it.

TIA,
Sid


Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-12 Thread Tathagata Das
You have understood the problem right. However note that your
interpretation of the output *(K, leftValue, null), **(K, leftValue,
rightValue1), **(K, leftValue, rightValue2)* is subject to the knowledge of
the semantics of the join. That if you are processing the output rows
*manually*, you are being aware that the operator is a join where you can
make the semantics interpretation of *"null replaced by first match, then
all matches are just addition rows".* This is however not a general
solution for any sink, and for any operator. We need to find a way to
expose these semantics through the APIs such that a sink can use it without
the knowledge of exactly what operator is in the query writing to the sink.
Therefore we still need some work before we can do join in update mode
clearly.

Hope that makes it clear. :)

On Sat, Mar 10, 2018 at 12:14 AM, kant kodali  wrote:

> I will give an attempt to answer this.
>
> since rightValue1 and rightValue2 have the same key "K"(two matches) why
> would it ever be the case *rightValue2* replacing *rightValue1* replacing 
> *null?
> *Moreover, why does user need to care?
>
> The result in this case (after getting 2 matches) should be
>
> *(K, leftValue, rightValue1)*
> *(K, leftValue, rightValue2)*
>
> This basically means only one of them replaced the earlier null. which one
> of two? Depends on whichever arrived first. Other words, "null's" will be
> replaced by first matching row and subsequently, if there is a new matching
> row it will just be another row with the same key in the result table or if
> there a new unmatched row then the result table should have null's for the
> unmatched fields.
>
> From a user perspective, I believe just spitting out nulls for every
> trigger until there is a match and when there is match spitting out the
> joined rows should suffice isn't it?
>
> Sorry if my thoughts are too naive!
>
>
>
>
>
>
>
>
>
>
> On Thu, Mar 8, 2018 at 6:14 PM, Tathagata Das  > wrote:
>
>> This doc is unrelated to the stream-stream join we added in Structured
>> Streaming. :)
>>
>> That said we added append mode first because it easier to reason about
>> the semantics of append mode especially in the context of outer joins. You
>> output a row only when you know it wont be changed ever. The semantics of
>> update mode in outer joins is trickier to reason about and expose through
>> the APIs. Consider a left outer join. As soon as we get a left-side record
>> with a key K that does not have a match, do we output *(K, leftValue,
>> null)*? And if we do so, then later get 2 matches from the right side,
>> we have to output *(K, leftValue, rightValue1) and (K, leftValue,
>> rightValue2)*. But how do we convey that *rightValue1* and *rightValue2 
>> *together
>> replace the earlier *null*, rather than *rightValue2* replacing
>> *rightValue1* replacing *null?*
>>
>> We will figure these out in future releases. For now, we have released
>> append mode, which allow quite a large range of use cases, including
>> multiple cascading joins.
>>
>> TD
>>
>>
>>
>> On Thu, Mar 8, 2018 at 9:18 AM, Gourav Sengupta <
>> gourav.sengu...@gmail.com> wrote:
>>
>>> super interesting.
>>>
>>> On Wed, Mar 7, 2018 at 11:44 AM, kant kodali  wrote:
>>>
 It looks to me that the StateStore described in this doc
 
  Actually
 has full outer join and every other join is a filter of that. Also the doc
 talks about update mode but looks like Spark 2.3 ended up with append mode?
 Anyways the moment it is in master I am ready to test so JIRA tickets on
 this would help to keep track. please let me know.

 Thanks!

 On Tue, Mar 6, 2018 at 9:16 PM, kant kodali  wrote:

> Sorry I meant Spark 2.4 in my previous email
>
> On Tue, Mar 6, 2018 at 9:15 PM, kant kodali 
> wrote:
>
>> Hi TD,
>>
>> I agree I think we are better off either with a full fix or no fix. I
>> am ok with the complete fix being available in master or some branch. I
>> guess the solution for me is to just build from the source.
>>
>> On a similar note, I am not finding any JIRA tickets related to full
>> outer joins and update mode for maybe say Spark 2.3. I wonder how hard is
>> it two implement both of these? It turns out the update mode and full 
>> outer
>> join is very useful and required in my case, therefore, I'm just asking.
>>
>> Thanks!
>>
>> On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> I thought about it.
>>> I am not 100% sure whether this fix should go into 2.3.1.
>>>
>>> There are two parts to this bug fix to enable self-joins.
>>>
>>> 1. Enabling deduping of leaf logical nodes by extending
>>> 

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-10 Thread kant kodali
I will give an attempt to answer this.

since rightValue1 and rightValue2 have the same key "K"(two matches) why
would it ever be the case *rightValue2* replacing *rightValue1*
replacing *null?
*Moreover, why does user need to care?

The result in this case (after getting 2 matches) should be

*(K, leftValue, rightValue1)*
*(K, leftValue, rightValue2)*

This basically means only one of them replaced the earlier null. which one
of two? Depends on whichever arrived first. Other words, "null's" will be
replaced by first matching row and subsequently, if there is a new matching
row it will just be another row with the same key in the result table or if
there a new unmatched row then the result table should have null's for the
unmatched fields.

>From a user perspective, I believe just spitting out nulls for every
trigger until there is a match and when there is match spitting out the
joined rows should suffice isn't it?

Sorry if my thoughts are too naive!










On Thu, Mar 8, 2018 at 6:14 PM, Tathagata Das 
wrote:

> This doc is unrelated to the stream-stream join we added in Structured
> Streaming. :)
>
> That said we added append mode first because it easier to reason about the
> semantics of append mode especially in the context of outer joins. You
> output a row only when you know it wont be changed ever. The semantics of
> update mode in outer joins is trickier to reason about and expose through
> the APIs. Consider a left outer join. As soon as we get a left-side record
> with a key K that does not have a match, do we output *(K, leftValue,
> null)*? And if we do so, then later get 2 matches from the right side, we
> have to output *(K, leftValue, rightValue1) and (K, leftValue,
> rightValue2)*. But how do we convey that *rightValue1* and *rightValue2 
> *together
> replace the earlier *null*, rather than *rightValue2* replacing
> *rightValue1* replacing *null?*
>
> We will figure these out in future releases. For now, we have released
> append mode, which allow quite a large range of use cases, including
> multiple cascading joins.
>
> TD
>
>
>
> On Thu, Mar 8, 2018 at 9:18 AM, Gourav Sengupta  > wrote:
>
>> super interesting.
>>
>> On Wed, Mar 7, 2018 at 11:44 AM, kant kodali  wrote:
>>
>>> It looks to me that the StateStore described in this doc
>>> 
>>>  Actually
>>> has full outer join and every other join is a filter of that. Also the doc
>>> talks about update mode but looks like Spark 2.3 ended up with append mode?
>>> Anyways the moment it is in master I am ready to test so JIRA tickets on
>>> this would help to keep track. please let me know.
>>>
>>> Thanks!
>>>
>>> On Tue, Mar 6, 2018 at 9:16 PM, kant kodali  wrote:
>>>
 Sorry I meant Spark 2.4 in my previous email

 On Tue, Mar 6, 2018 at 9:15 PM, kant kodali  wrote:

> Hi TD,
>
> I agree I think we are better off either with a full fix or no fix. I
> am ok with the complete fix being available in master or some branch. I
> guess the solution for me is to just build from the source.
>
> On a similar note, I am not finding any JIRA tickets related to full
> outer joins and update mode for maybe say Spark 2.3. I wonder how hard is
> it two implement both of these? It turns out the update mode and full 
> outer
> join is very useful and required in my case, therefore, I'm just asking.
>
> Thanks!
>
> On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> I thought about it.
>> I am not 100% sure whether this fix should go into 2.3.1.
>>
>> There are two parts to this bug fix to enable self-joins.
>>
>> 1. Enabling deduping of leaf logical nodes by extending
>> MultInstanceRelation
>>   - This is safe to be backported into the 2.3 branch as it does not
>> touch production code paths.
>>
>> 2. Fixing attribute rewriting in MicroBatchExecution, when the
>> micro-batch plan is spliced into the streaming plan.
>>   - This touches core production code paths and therefore, may not
>> safe to backport.
>>
>> Part 1 enables self-joins in all but a small fraction of self-join
>> queries. That small fraction can produce incorrect results, and part 2
>> avoids that.
>>
>> So for 2.3.1, we can enable self-joins by merging only part 1, but it
>> can give wrong results in some cases. I think that is strictly worse than
>> no fix.
>>
>> TD
>>
>>
>>
>> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali 
>> wrote:
>>
>>> Hi TD,
>>>
>>> I pulled your commit that is listed on this ticket
>>> https://issues.apache.org/jira/browse/SPARK-23406 specifically I
>>> did the following steps and self 

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-08 Thread Tathagata Das
This doc is unrelated to the stream-stream join we added in Structured
Streaming. :)

That said we added append mode first because it easier to reason about the
semantics of append mode especially in the context of outer joins. You
output a row only when you know it wont be changed ever. The semantics of
update mode in outer joins is trickier to reason about and expose through
the APIs. Consider a left outer join. As soon as we get a left-side record
with a key K that does not have a match, do we output *(K, leftValue, null)*?
And if we do so, then later get 2 matches from the right side, we have to
output *(K, leftValue, rightValue1) and (K, leftValue, rightValue2)*. But
how do we convey that *rightValue1* and *rightValue2 *together replace the
earlier *null*, rather than *rightValue2* replacing *rightValue1* replacing
*null?*

We will figure these out in future releases. For now, we have released
append mode, which allow quite a large range of use cases, including
multiple cascading joins.

TD



On Thu, Mar 8, 2018 at 9:18 AM, Gourav Sengupta 
wrote:

> super interesting.
>
> On Wed, Mar 7, 2018 at 11:44 AM, kant kodali  wrote:
>
>> It looks to me that the StateStore described in this doc
>> 
>>  Actually
>> has full outer join and every other join is a filter of that. Also the doc
>> talks about update mode but looks like Spark 2.3 ended up with append mode?
>> Anyways the moment it is in master I am ready to test so JIRA tickets on
>> this would help to keep track. please let me know.
>>
>> Thanks!
>>
>> On Tue, Mar 6, 2018 at 9:16 PM, kant kodali  wrote:
>>
>>> Sorry I meant Spark 2.4 in my previous email
>>>
>>> On Tue, Mar 6, 2018 at 9:15 PM, kant kodali  wrote:
>>>
 Hi TD,

 I agree I think we are better off either with a full fix or no fix. I
 am ok with the complete fix being available in master or some branch. I
 guess the solution for me is to just build from the source.

 On a similar note, I am not finding any JIRA tickets related to full
 outer joins and update mode for maybe say Spark 2.3. I wonder how hard is
 it two implement both of these? It turns out the update mode and full outer
 join is very useful and required in my case, therefore, I'm just asking.

 Thanks!

 On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <
 tathagata.das1...@gmail.com> wrote:

> I thought about it.
> I am not 100% sure whether this fix should go into 2.3.1.
>
> There are two parts to this bug fix to enable self-joins.
>
> 1. Enabling deduping of leaf logical nodes by extending
> MultInstanceRelation
>   - This is safe to be backported into the 2.3 branch as it does not
> touch production code paths.
>
> 2. Fixing attribute rewriting in MicroBatchExecution, when the
> micro-batch plan is spliced into the streaming plan.
>   - This touches core production code paths and therefore, may not
> safe to backport.
>
> Part 1 enables self-joins in all but a small fraction of self-join
> queries. That small fraction can produce incorrect results, and part 2
> avoids that.
>
> So for 2.3.1, we can enable self-joins by merging only part 1, but it
> can give wrong results in some cases. I think that is strictly worse than
> no fix.
>
> TD
>
>
>
> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali 
> wrote:
>
>> Hi TD,
>>
>> I pulled your commit that is listed on this ticket
>> https://issues.apache.org/jira/browse/SPARK-23406 specifically I did
>> the following steps and self joins work after I cherry-pick your commit!
>> Good Job! I was hoping it will be part of 2.3.0 but looks like it is
>> targeted for 2.3.1 :(
>>
>> git clone https://github.com/apache/spark.gitcd spark
>> git fetch
>> git checkout branch-2.3
>> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
>> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
>> ./build/mvn -DskipTests compile
>> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr 
>> -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn
>>
>>
>> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Hey,
>>>
>>> Thanks for testing out stream-stream joins and reporting this issue.
>>> I am going to take a look at this.
>>>
>>> TD
>>>
>>>
>>>
>>> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali 
>>> wrote:
>>>
 if I change it to the below code it works. However, I don't believe
 it is the solution I am looking for. I want to be able to do it in raw
 SQL and moreover, If a user gives a big 

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-08 Thread Gourav Sengupta
super interesting.

On Wed, Mar 7, 2018 at 11:44 AM, kant kodali  wrote:

> It looks to me that the StateStore described in this doc
> 
>  Actually
> has full outer join and every other join is a filter of that. Also the doc
> talks about update mode but looks like Spark 2.3 ended up with append mode?
> Anyways the moment it is in master I am ready to test so JIRA tickets on
> this would help to keep track. please let me know.
>
> Thanks!
>
> On Tue, Mar 6, 2018 at 9:16 PM, kant kodali  wrote:
>
>> Sorry I meant Spark 2.4 in my previous email
>>
>> On Tue, Mar 6, 2018 at 9:15 PM, kant kodali  wrote:
>>
>>> Hi TD,
>>>
>>> I agree I think we are better off either with a full fix or no fix. I am
>>> ok with the complete fix being available in master or some branch. I guess
>>> the solution for me is to just build from the source.
>>>
>>> On a similar note, I am not finding any JIRA tickets related to full
>>> outer joins and update mode for maybe say Spark 2.3. I wonder how hard is
>>> it two implement both of these? It turns out the update mode and full outer
>>> join is very useful and required in my case, therefore, I'm just asking.
>>>
>>> Thanks!
>>>
>>> On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 I thought about it.
 I am not 100% sure whether this fix should go into 2.3.1.

 There are two parts to this bug fix to enable self-joins.

 1. Enabling deduping of leaf logical nodes by extending
 MultInstanceRelation
   - This is safe to be backported into the 2.3 branch as it does not
 touch production code paths.

 2. Fixing attribute rewriting in MicroBatchExecution, when the
 micro-batch plan is spliced into the streaming plan.
   - This touches core production code paths and therefore, may not safe
 to backport.

 Part 1 enables self-joins in all but a small fraction of self-join
 queries. That small fraction can produce incorrect results, and part 2
 avoids that.

 So for 2.3.1, we can enable self-joins by merging only part 1, but it
 can give wrong results in some cases. I think that is strictly worse than
 no fix.

 TD



 On Thu, Feb 22, 2018 at 2:32 PM, kant kodali 
 wrote:

> Hi TD,
>
> I pulled your commit that is listed on this ticket
> https://issues.apache.org/jira/browse/SPARK-23406 specifically I did
> the following steps and self joins work after I cherry-pick your commit!
> Good Job! I was hoping it will be part of 2.3.0 but looks like it is
> targeted for 2.3.1 :(
>
> git clone https://github.com/apache/spark.gitcd spark
> git fetch
> git checkout branch-2.3
> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
> ./build/mvn -DskipTests compile
> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr 
> -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn
>
>
> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Hey,
>>
>> Thanks for testing out stream-stream joins and reporting this issue.
>> I am going to take a look at this.
>>
>> TD
>>
>>
>>
>> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali 
>> wrote:
>>
>>> if I change it to the below code it works. However, I don't believe
>>> it is the solution I am looking for. I want to be able to do it in raw
>>> SQL and moreover, If a user gives a big chained raw spark SQL join 
>>> query I
>>> am not even sure how to make copies of the dataframe to achieve the
>>> self-join. Is there any other way here?
>>>
>>>
>>>
>>> import org.apache.spark.sql.streaming.Trigger
>>>
>>> val jdf = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>> "localhost:9092").option("subscribe", 
>>> "join_test").option("startingOffsets", "earliest").load();
>>> val jdf1 = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>> "localhost:9092").option("subscribe", 
>>> "join_test").option("startingOffsets", "earliest").load();
>>>
>>> jdf.createOrReplaceTempView("table")
>>> jdf1.createOrReplaceTempView("table")
>>>
>>> val resultdf = spark.sql("select * from table inner join table1 on 
>>> table.offset=table1.offset")
>>>
>>> resultdf.writeStream.outputMode("append").format("console").option("truncate",
>>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>>
>>>
>>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali 
>>> wrote:
>>>
 If I change it to this


Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-07 Thread kant kodali
It looks to me that the StateStore described in this doc

Actually
has full outer join and every other join is a filter of that. Also the doc
talks about update mode but looks like Spark 2.3 ended up with append mode?
Anyways the moment it is in master I am ready to test so JIRA tickets on
this would help to keep track. please let me know.

Thanks!

On Tue, Mar 6, 2018 at 9:16 PM, kant kodali  wrote:

> Sorry I meant Spark 2.4 in my previous email
>
> On Tue, Mar 6, 2018 at 9:15 PM, kant kodali  wrote:
>
>> Hi TD,
>>
>> I agree I think we are better off either with a full fix or no fix. I am
>> ok with the complete fix being available in master or some branch. I guess
>> the solution for me is to just build from the source.
>>
>> On a similar note, I am not finding any JIRA tickets related to full
>> outer joins and update mode for maybe say Spark 2.3. I wonder how hard is
>> it two implement both of these? It turns out the update mode and full outer
>> join is very useful and required in my case, therefore, I'm just asking.
>>
>> Thanks!
>>
>> On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> I thought about it.
>>> I am not 100% sure whether this fix should go into 2.3.1.
>>>
>>> There are two parts to this bug fix to enable self-joins.
>>>
>>> 1. Enabling deduping of leaf logical nodes by extending
>>> MultInstanceRelation
>>>   - This is safe to be backported into the 2.3 branch as it does not
>>> touch production code paths.
>>>
>>> 2. Fixing attribute rewriting in MicroBatchExecution, when the
>>> micro-batch plan is spliced into the streaming plan.
>>>   - This touches core production code paths and therefore, may not safe
>>> to backport.
>>>
>>> Part 1 enables self-joins in all but a small fraction of self-join
>>> queries. That small fraction can produce incorrect results, and part 2
>>> avoids that.
>>>
>>> So for 2.3.1, we can enable self-joins by merging only part 1, but it
>>> can give wrong results in some cases. I think that is strictly worse than
>>> no fix.
>>>
>>> TD
>>>
>>>
>>>
>>> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali  wrote:
>>>
 Hi TD,

 I pulled your commit that is listed on this ticket
 https://issues.apache.org/jira/browse/SPARK-23406 specifically I did
 the following steps and self joins work after I cherry-pick your commit!
 Good Job! I was hoping it will be part of 2.3.0 but looks like it is
 targeted for 2.3.1 :(

 git clone https://github.com/apache/spark.gitcd spark
 git fetch
 git checkout branch-2.3
 git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
 export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
 ./build/mvn -DskipTests compile
 ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr 
 -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn


 On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
 tathagata.das1...@gmail.com> wrote:

> Hey,
>
> Thanks for testing out stream-stream joins and reporting this issue. I
> am going to take a look at this.
>
> TD
>
>
>
> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali 
> wrote:
>
>> if I change it to the below code it works. However, I don't believe
>> it is the solution I am looking for. I want to be able to do it in raw
>> SQL and moreover, If a user gives a big chained raw spark SQL join query 
>> I
>> am not even sure how to make copies of the dataframe to achieve the
>> self-join. Is there any other way here?
>>
>>
>>
>> import org.apache.spark.sql.streaming.Trigger
>>
>> val jdf = 
>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", 
>> "join_test").option("startingOffsets", "earliest").load();
>> val jdf1 = 
>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", 
>> "join_test").option("startingOffsets", "earliest").load();
>>
>> jdf.createOrReplaceTempView("table")
>> jdf1.createOrReplaceTempView("table")
>>
>> val resultdf = spark.sql("select * from table inner join table1 on 
>> table.offset=table1.offset")
>>
>> resultdf.writeStream.outputMode("append").format("console").option("truncate",
>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>
>>
>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali 
>> wrote:
>>
>>> If I change it to this
>>>
>>>
>>>
>>>
>>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali 
>>> wrote:
>>>
 Hi All,

 I have the following code

 import 

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-06 Thread kant kodali
Sorry I meant Spark 2.4 in my previous email

On Tue, Mar 6, 2018 at 9:15 PM, kant kodali  wrote:

> Hi TD,
>
> I agree I think we are better off either with a full fix or no fix. I am
> ok with the complete fix being available in master or some branch. I guess
> the solution for me is to just build from the source.
>
> On a similar note, I am not finding any JIRA tickets related to full outer
> joins and update mode for maybe say Spark 2.3. I wonder how hard is it two
> implement both of these? It turns out the update mode and full outer join
> is very useful and required in my case, therefore, I'm just asking.
>
> Thanks!
>
> On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das  > wrote:
>
>> I thought about it.
>> I am not 100% sure whether this fix should go into 2.3.1.
>>
>> There are two parts to this bug fix to enable self-joins.
>>
>> 1. Enabling deduping of leaf logical nodes by extending
>> MultInstanceRelation
>>   - This is safe to be backported into the 2.3 branch as it does not
>> touch production code paths.
>>
>> 2. Fixing attribute rewriting in MicroBatchExecution, when the
>> micro-batch plan is spliced into the streaming plan.
>>   - This touches core production code paths and therefore, may not safe
>> to backport.
>>
>> Part 1 enables self-joins in all but a small fraction of self-join
>> queries. That small fraction can produce incorrect results, and part 2
>> avoids that.
>>
>> So for 2.3.1, we can enable self-joins by merging only part 1, but it can
>> give wrong results in some cases. I think that is strictly worse than no
>> fix.
>>
>> TD
>>
>>
>>
>> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali  wrote:
>>
>>> Hi TD,
>>>
>>> I pulled your commit that is listed on this ticket
>>> https://issues.apache.org/jira/browse/SPARK-23406 specifically I did
>>> the following steps and self joins work after I cherry-pick your commit!
>>> Good Job! I was hoping it will be part of 2.3.0 but looks like it is
>>> targeted for 2.3.1 :(
>>>
>>> git clone https://github.com/apache/spark.gitcd spark
>>> git fetch
>>> git checkout branch-2.3
>>> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
>>> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
>>> ./build/mvn -DskipTests compile
>>> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr 
>>> -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn
>>>
>>>
>>> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
>>> tathagata.das1...@gmail.com> wrote:
>>>
 Hey,

 Thanks for testing out stream-stream joins and reporting this issue. I
 am going to take a look at this.

 TD



 On Tue, Feb 20, 2018 at 8:20 PM, kant kodali 
 wrote:

> if I change it to the below code it works. However, I don't believe it
> is the solution I am looking for. I want to be able to do it in raw SQL 
> and
> moreover, If a user gives a big chained raw spark SQL join query I am not
> even sure how to make copies of the dataframe to achieve the self-join. Is
> there any other way here?
>
>
>
> import org.apache.spark.sql.streaming.Trigger
>
> val jdf = 
> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", 
> "join_test").option("startingOffsets", "earliest").load();
> val jdf1 = 
> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", 
> "join_test").option("startingOffsets", "earliest").load();
>
> jdf.createOrReplaceTempView("table")
> jdf1.createOrReplaceTempView("table")
>
> val resultdf = spark.sql("select * from table inner join table1 on 
> table.offset=table1.offset")
>
> resultdf.writeStream.outputMode("append").format("console").option("truncate",
>  false).trigger(Trigger.ProcessingTime(1000)).start()
>
>
> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali 
> wrote:
>
>> If I change it to this
>>
>>
>>
>>
>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali 
>> wrote:
>>
>>> Hi All,
>>>
>>> I have the following code
>>>
>>> import org.apache.spark.sql.streaming.Trigger
>>>
>>> val jdf = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>> "localhost:9092").option("subscribe", 
>>> "join_test").option("startingOffsets", "earliest").load();
>>>
>>> jdf.createOrReplaceTempView("table")
>>>
>>> val resultdf = spark.sql("select * from table as x inner join table as 
>>> y on x.offset=y.offset")
>>>
>>> resultdf.writeStream.outputMode("update").format("console").option("truncate",
>>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>>
>>> and I get the following exception.
>>>
>>> 

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-06 Thread kant kodali
Hi TD,

I agree I think we are better off either with a full fix or no fix. I am ok
with the complete fix being available in master or some branch. I guess the
solution for me is to just build from the source.

On a similar note, I am not finding any JIRA tickets related to full outer
joins and update mode for maybe say Spark 2.3. I wonder how hard is it two
implement both of these? It turns out the update mode and full outer join
is very useful and required in my case, therefore, I'm just asking.

Thanks!

On Tue, Mar 6, 2018 at 6:25 PM, Tathagata Das 
wrote:

> I thought about it.
> I am not 100% sure whether this fix should go into 2.3.1.
>
> There are two parts to this bug fix to enable self-joins.
>
> 1. Enabling deduping of leaf logical nodes by extending
> MultInstanceRelation
>   - This is safe to be backported into the 2.3 branch as it does not touch
> production code paths.
>
> 2. Fixing attribute rewriting in MicroBatchExecution, when the micro-batch
> plan is spliced into the streaming plan.
>   - This touches core production code paths and therefore, may not safe to
> backport.
>
> Part 1 enables self-joins in all but a small fraction of self-join
> queries. That small fraction can produce incorrect results, and part 2
> avoids that.
>
> So for 2.3.1, we can enable self-joins by merging only part 1, but it can
> give wrong results in some cases. I think that is strictly worse than no
> fix.
>
> TD
>
>
>
> On Thu, Feb 22, 2018 at 2:32 PM, kant kodali  wrote:
>
>> Hi TD,
>>
>> I pulled your commit that is listed on this ticket
>> https://issues.apache.org/jira/browse/SPARK-23406 specifically I did the
>> following steps and self joins work after I cherry-pick your commit!
>> Good Job! I was hoping it will be part of 2.3.0 but looks like it is
>> targeted for 2.3.1 :(
>>
>> git clone https://github.com/apache/spark.gitcd spark
>> git fetch
>> git checkout branch-2.3
>> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
>> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
>> ./build/mvn -DskipTests compile
>> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr 
>> -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn
>>
>>
>> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
>> tathagata.das1...@gmail.com> wrote:
>>
>>> Hey,
>>>
>>> Thanks for testing out stream-stream joins and reporting this issue. I
>>> am going to take a look at this.
>>>
>>> TD
>>>
>>>
>>>
>>> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali  wrote:
>>>
 if I change it to the below code it works. However, I don't believe it
 is the solution I am looking for. I want to be able to do it in raw SQL and
 moreover, If a user gives a big chained raw spark SQL join query I am not
 even sure how to make copies of the dataframe to achieve the self-join. Is
 there any other way here?



 import org.apache.spark.sql.streaming.Trigger

 val jdf = 
 spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
 "localhost:9092").option("subscribe", 
 "join_test").option("startingOffsets", "earliest").load();
 val jdf1 = 
 spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
 "localhost:9092").option("subscribe", 
 "join_test").option("startingOffsets", "earliest").load();

 jdf.createOrReplaceTempView("table")
 jdf1.createOrReplaceTempView("table")

 val resultdf = spark.sql("select * from table inner join table1 on 
 table.offset=table1.offset")

 resultdf.writeStream.outputMode("append").format("console").option("truncate",
  false).trigger(Trigger.ProcessingTime(1000)).start()


 On Tue, Feb 20, 2018 at 8:16 PM, kant kodali 
 wrote:

> If I change it to this
>
>
>
>
> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali 
> wrote:
>
>> Hi All,
>>
>> I have the following code
>>
>> import org.apache.spark.sql.streaming.Trigger
>>
>> val jdf = 
>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", 
>> "join_test").option("startingOffsets", "earliest").load();
>>
>> jdf.createOrReplaceTempView("table")
>>
>> val resultdf = spark.sql("select * from table as x inner join table as y 
>> on x.offset=y.offset")
>>
>> resultdf.writeStream.outputMode("update").format("console").option("truncate",
>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>
>> and I get the following exception.
>>
>> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' 
>> given input columns: [x.value, x.offset, x.key, x.timestampType, 
>> x.topic, x.timestamp, x.partition]; line 1 pos 50;
>> 'Project [*]
>> +- 'Join Inner, ('x.offset = 'y.offset)
>>:- SubqueryAlias x
>> 

Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-03-06 Thread Tathagata Das
I thought about it.
I am not 100% sure whether this fix should go into 2.3.1.

There are two parts to this bug fix to enable self-joins.

1. Enabling deduping of leaf logical nodes by extending
MultInstanceRelation
  - This is safe to be backported into the 2.3 branch as it does not touch
production code paths.

2. Fixing attribute rewriting in MicroBatchExecution, when the micro-batch
plan is spliced into the streaming plan.
  - This touches core production code paths and therefore, may not safe to
backport.

Part 1 enables self-joins in all but a small fraction of self-join queries.
That small fraction can produce incorrect results, and part 2 avoids that.

So for 2.3.1, we can enable self-joins by merging only part 1, but it can
give wrong results in some cases. I think that is strictly worse than no
fix.

TD



On Thu, Feb 22, 2018 at 2:32 PM, kant kodali  wrote:

> Hi TD,
>
> I pulled your commit that is listed on this ticket https://issues.apache.
> org/jira/browse/SPARK-23406 specifically I did the following steps and
> self joins work after I cherry-pick your commit! Good Job! I was hoping it
> will be part of 2.3.0 but looks like it is targeted for 2.3.1 :(
>
> git clone https://github.com/apache/spark.gitcd spark
> git fetch
> git checkout branch-2.3
> git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
> export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
> ./build/mvn -DskipTests compile
> ./dev/make-distribution.sh --name custom-spark --pip --r --tgz -Psparkr 
> -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn
>
>
> On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das <
> tathagata.das1...@gmail.com> wrote:
>
>> Hey,
>>
>> Thanks for testing out stream-stream joins and reporting this issue. I am
>> going to take a look at this.
>>
>> TD
>>
>>
>>
>> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali  wrote:
>>
>>> if I change it to the below code it works. However, I don't believe it
>>> is the solution I am looking for. I want to be able to do it in raw SQL and
>>> moreover, If a user gives a big chained raw spark SQL join query I am not
>>> even sure how to make copies of the dataframe to achieve the self-join. Is
>>> there any other way here?
>>>
>>>
>>>
>>> import org.apache.spark.sql.streaming.Trigger
>>>
>>> val jdf = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>> "localhost:9092").option("subscribe", 
>>> "join_test").option("startingOffsets", "earliest").load();
>>> val jdf1 = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>> "localhost:9092").option("subscribe", 
>>> "join_test").option("startingOffsets", "earliest").load();
>>>
>>> jdf.createOrReplaceTempView("table")
>>> jdf1.createOrReplaceTempView("table")
>>>
>>> val resultdf = spark.sql("select * from table inner join table1 on 
>>> table.offset=table1.offset")
>>>
>>> resultdf.writeStream.outputMode("append").format("console").option("truncate",
>>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>>
>>>
>>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali  wrote:
>>>
 If I change it to this




 On Tue, Feb 20, 2018 at 7:52 PM, kant kodali 
 wrote:

> Hi All,
>
> I have the following code
>
> import org.apache.spark.sql.streaming.Trigger
>
> val jdf = 
> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", 
> "join_test").option("startingOffsets", "earliest").load();
>
> jdf.createOrReplaceTempView("table")
>
> val resultdf = spark.sql("select * from table as x inner join table as y 
> on x.offset=y.offset")
>
> resultdf.writeStream.outputMode("update").format("console").option("truncate",
>  false).trigger(Trigger.ProcessingTime(1000)).start()
>
> and I get the following exception.
>
> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given 
> input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, 
> x.timestamp, x.partition]; line 1 pos 50;
> 'Project [*]
> +- 'Join Inner, ('x.offset = 'y.offset)
>:- SubqueryAlias x
>:  +- SubqueryAlias table
>: +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
> offset#32L, timestamp#33, timestampType#34]
>+- SubqueryAlias y
>   +- SubqueryAlias table
>  +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
> 

Joins in spark for large tables

2018-02-28 Thread KhajaAsmath Mohammed
Hi,

Is there any best approach to reduce shuffling in spark. I have two tables
and both of them are large. any suggestions? I saw only about broadcast but
that will not work in my case.

Thanks,
Asmath


Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-02-22 Thread kant kodali
Hi TD,

I pulled your commit that is listed on this ticket
https://issues.apache.org/jira/browse/SPARK-23406 specifically I did the
following steps and self joins work after I cherry-pick your commit!
Good Job! I was hoping it will be part of 2.3.0 but looks like it is
targeted for 2.3.1 :(

git clone https://github.com/apache/spark.gitcd spark
git fetch
git checkout branch-2.3
git cherry-pick 658d9d9d785a30857bf35d164e6cbbd9799d6959
export MAVEN_OPTS="-Xmx2g -XX:ReservedCodeCacheSize=512m"
./build/mvn -DskipTests compile
./dev/make-distribution.sh --name custom-spark --pip --r --tgz
-Psparkr -Phadoop-2.7 -Phive -Phive-thriftserver -Pmesos -Pyarn


On Thu, Feb 22, 2018 at 11:25 AM, Tathagata Das  wrote:

> Hey,
>
> Thanks for testing out stream-stream joins and reporting this issue. I am
> going to take a look at this.
>
> TD
>
>
>
> On Tue, Feb 20, 2018 at 8:20 PM, kant kodali  wrote:
>
>> if I change it to the below code it works. However, I don't believe it is
>> the solution I am looking for. I want to be able to do it in raw SQL and
>> moreover, If a user gives a big chained raw spark SQL join query I am not
>> even sure how to make copies of the dataframe to achieve the self-join. Is
>> there any other way here?
>>
>>
>>
>> import org.apache.spark.sql.streaming.Trigger
>>
>> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", "join_test").option("startingOffsets", 
>> "earliest").load();
>> val jdf1 = 
>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", "join_test").option("startingOffsets", 
>> "earliest").load();
>>
>> jdf.createOrReplaceTempView("table")
>> jdf1.createOrReplaceTempView("table")
>>
>> val resultdf = spark.sql("select * from table inner join table1 on 
>> table.offset=table1.offset")
>>
>> resultdf.writeStream.outputMode("append").format("console").option("truncate",
>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>
>>
>> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali  wrote:
>>
>>> If I change it to this
>>>
>>>
>>>
>>>
>>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali  wrote:
>>>
 Hi All,

 I have the following code

 import org.apache.spark.sql.streaming.Trigger

 val jdf = 
 spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
 "localhost:9092").option("subscribe", 
 "join_test").option("startingOffsets", "earliest").load();

 jdf.createOrReplaceTempView("table")

 val resultdf = spark.sql("select * from table as x inner join table as y 
 on x.offset=y.offset")

 resultdf.writeStream.outputMode("update").format("console").option("truncate",
  false).trigger(Trigger.ProcessingTime(1000)).start()

 and I get the following exception.

 org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given 
 input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, 
 x.timestamp, x.partition]; line 1 pos 50;
 'Project [*]
 +- 'Join Inner, ('x.offset = 'y.offset)
:- SubqueryAlias x
:  +- SubqueryAlias table
: +- StreamingRelation 
 DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
 localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
 offset#32L, timestamp#33, timestampType#34]
+- SubqueryAlias y
   +- SubqueryAlias table
  +- StreamingRelation 
 DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
 localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
 offset#32L, timestamp#33, timestampType#34]

 any idea whats wrong here?

 Thanks!







>>>
>>
>


Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-02-22 Thread Tathagata Das
Hey,

Thanks for testing out stream-stream joins and reporting this issue. I am
going to take a look at this.

TD



On Tue, Feb 20, 2018 at 8:20 PM, kant kodali  wrote:

> if I change it to the below code it works. However, I don't believe it is
> the solution I am looking for. I want to be able to do it in raw SQL and
> moreover, If a user gives a big chained raw spark SQL join query I am not
> even sure how to make copies of the dataframe to achieve the self-join. Is
> there any other way here?
>
>
>
> import org.apache.spark.sql.streaming.Trigger
>
> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "join_test").option("startingOffsets", 
> "earliest").load();
> val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "join_test").option("startingOffsets", 
> "earliest").load();
>
> jdf.createOrReplaceTempView("table")
> jdf1.createOrReplaceTempView("table")
>
> val resultdf = spark.sql("select * from table inner join table1 on 
> table.offset=table1.offset")
>
> resultdf.writeStream.outputMode("append").format("console").option("truncate",
>  false).trigger(Trigger.ProcessingTime(1000)).start()
>
>
> On Tue, Feb 20, 2018 at 8:16 PM, kant kodali  wrote:
>
>> If I change it to this
>>
>>
>>
>>
>> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali  wrote:
>>
>>> Hi All,
>>>
>>> I have the following code
>>>
>>> import org.apache.spark.sql.streaming.Trigger
>>>
>>> val jdf = 
>>> spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>>> "localhost:9092").option("subscribe", 
>>> "join_test").option("startingOffsets", "earliest").load();
>>>
>>> jdf.createOrReplaceTempView("table")
>>>
>>> val resultdf = spark.sql("select * from table as x inner join table as y on 
>>> x.offset=y.offset")
>>>
>>> resultdf.writeStream.outputMode("update").format("console").option("truncate",
>>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>>
>>> and I get the following exception.
>>>
>>> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given 
>>> input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, 
>>> x.timestamp, x.partition]; line 1 pos 50;
>>> 'Project [*]
>>> +- 'Join Inner, ('x.offset = 'y.offset)
>>>:- SubqueryAlias x
>>>:  +- SubqueryAlias table
>>>: +- StreamingRelation 
>>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>>>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
>>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
>>> offset#32L, timestamp#33, timestampType#34]
>>>+- SubqueryAlias y
>>>   +- SubqueryAlias table
>>>  +- StreamingRelation 
>>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>>>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
>>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
>>> offset#32L, timestamp#33, timestampType#34]
>>>
>>> any idea whats wrong here?
>>>
>>> Thanks!
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>
>


Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-02-20 Thread kant kodali
if I change it to the below code it works. However, I don't believe it is
the solution I am looking for. I want to be able to do it in raw SQL and
moreover, If a user gives a big chained raw spark SQL join query I am not
even sure how to make copies of the dataframe to achieve the self-join. Is
there any other way here?



import org.apache.spark.sql.streaming.Trigger

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe",
"join_test").option("startingOffsets", "earliest").load();
val jdf1 = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe",
"join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")
jdf1.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table inner join table1 on
table.offset=table1.offset")

resultdf.writeStream.outputMode("append").format("console").option("truncate",
false).trigger(Trigger.ProcessingTime(1000)).start()


On Tue, Feb 20, 2018 at 8:16 PM, kant kodali  wrote:

> If I change it to this
>
>
>
>
> On Tue, Feb 20, 2018 at 7:52 PM, kant kodali  wrote:
>
>> Hi All,
>>
>> I have the following code
>>
>> import org.apache.spark.sql.streaming.Trigger
>>
>> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
>> "localhost:9092").option("subscribe", "join_test").option("startingOffsets", 
>> "earliest").load();
>>
>> jdf.createOrReplaceTempView("table")
>>
>> val resultdf = spark.sql("select * from table as x inner join table as y on 
>> x.offset=y.offset")
>>
>> resultdf.writeStream.outputMode("update").format("console").option("truncate",
>>  false).trigger(Trigger.ProcessingTime(1000)).start()
>>
>> and I get the following exception.
>>
>> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given 
>> input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, 
>> x.timestamp, x.partition]; line 1 pos 50;
>> 'Project [*]
>> +- 'Join Inner, ('x.offset = 'y.offset)
>>:- SubqueryAlias x
>>:  +- SubqueryAlias table
>>: +- StreamingRelation 
>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
>> offset#32L, timestamp#33, timestampType#34]
>>+- SubqueryAlias y
>>   +- SubqueryAlias table
>>  +- StreamingRelation 
>> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
>> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
>> offset#32L, timestamp#33, timestampType#34]
>>
>> any idea whats wrong here?
>>
>> Thanks!
>>
>>
>>
>>
>>
>>
>>
>


Re: what is the right syntax for self joins in Spark 2.3.0 ?

2018-02-20 Thread kant kodali
If I change it to this




On Tue, Feb 20, 2018 at 7:52 PM, kant kodali  wrote:

> Hi All,
>
> I have the following code
>
> import org.apache.spark.sql.streaming.Trigger
>
> val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers", 
> "localhost:9092").option("subscribe", "join_test").option("startingOffsets", 
> "earliest").load();
>
> jdf.createOrReplaceTempView("table")
>
> val resultdf = spark.sql("select * from table as x inner join table as y on 
> x.offset=y.offset")
>
> resultdf.writeStream.outputMode("update").format("console").option("truncate",
>  false).trigger(Trigger.ProcessingTime(1000)).start()
>
> and I get the following exception.
>
> org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`' given 
> input columns: [x.value, x.offset, x.key, x.timestampType, x.topic, 
> x.timestamp, x.partition]; line 1 pos 50;
> 'Project [*]
> +- 'Join Inner, ('x.offset = 'y.offset)
>:- SubqueryAlias x
>:  +- SubqueryAlias table
>: +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
> offset#32L, timestamp#33, timestampType#34]
>+- SubqueryAlias y
>   +- SubqueryAlias table
>  +- StreamingRelation 
> DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
>  -> earliest, subscribe -> join_test, kafka.bootstrap.servers -> 
> localhost:9092),None), kafka, [key#28, value#29, topic#30, partition#31, 
> offset#32L, timestamp#33, timestampType#34]
>
> any idea whats wrong here?
>
> Thanks!
>
>
>
>
>
>
>


what is the right syntax for self joins in Spark 2.3.0 ?

2018-02-20 Thread kant kodali
Hi All,

I have the following code

import org.apache.spark.sql.streaming.Trigger

val jdf = spark.readStream.format("kafka").option("kafka.bootstrap.servers",
"localhost:9092").option("subscribe",
"join_test").option("startingOffsets", "earliest").load();

jdf.createOrReplaceTempView("table")

val resultdf = spark.sql("select * from table as x inner join table as
y on x.offset=y.offset")

resultdf.writeStream.outputMode("update").format("console").option("truncate",
false).trigger(Trigger.ProcessingTime(1000)).start()

and I get the following exception.

org.apache.spark.sql.AnalysisException: cannot resolve '`x.offset`'
given input columns: [x.value, x.offset, x.key, x.timestampType,
x.topic, x.timestamp, x.partition]; line 1 pos 50;
'Project [*]
+- 'Join Inner, ('x.offset = 'y.offset)
   :- SubqueryAlias x
   :  +- SubqueryAlias table
   : +- StreamingRelation
DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
-> earliest, subscribe -> join_test, kafka.bootstrap.servers ->
localhost:9092),None), kafka, [key#28, value#29, topic#30,
partition#31, offset#32L, timestamp#33, timestampType#34]
   +- SubqueryAlias y
  +- SubqueryAlias table
 +- StreamingRelation
DataSource(org.apache.spark.sql.SparkSession@15f3f9cf,kafka,List(),None,List(),None,Map(startingOffsets
-> earliest, subscribe -> join_test, kafka.bootstrap.servers ->
localhost:9092),None), kafka, [key#28, value#29, topic#30,
partition#31, offset#32L, timestamp#33, timestampType#34]

any idea whats wrong here?

Thanks!


Re: DataFrame joins with Spark-Java

2017-11-29 Thread Rishi Mishra
Hi Sushma,
can you try as below with a left anti join ..In my example name & id
consists of a key.

df1.alias("a").join(df2.alias("b"),
col("a.name").equalTo(col("b.name"))
.and(col("a.id").equalTo(col("b.id"))) ,
"left_anti").selectExpr("name", "id").show(10, false);

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Thu, Nov 30, 2017 at 7:38 AM, sushma spark 
wrote:

> Dear Friends,
>
> I am new to spark DataFrame. My requirement is i have a dataframe1
> contains the today's records and dataframe2 contains yesterday's records. I
> need to compare the today's records with yesterday's records and find out
> new records which are not exists in the yesterday's records based on the
> primary key of the column. Here, the problem is sometimes there are
> multiple columns having primary keys.
>
> I am receiving primary key columns in a List.
>
> example:
>
> List primaryKeyList = listOfPrimarykeys; // single or multiple
> primary key columns
>
> DataFrame currentDataRecords = queryexecutor.getCurrentRecords(); // this
> contains today's records
> DataFrame yesterdayRecords = queryexecutor.getYesterdayRecords();// this
> contains yesterday's records
>
> Can you anyone help me how to join these two dataframes and apply WHERE
> conditions on columns dynamically with SPARK-JAVA code.
>
> Thanks
> Sushma
>
>


DataFrame joins with Spark-Java

2017-11-29 Thread sushma spark
Dear Friends,

I am new to spark DataFrame. My requirement is i have a dataframe1 contains
the today's records and dataframe2 contains yesterday's records. I need to
compare the today's records with yesterday's records and find out new
records which are not exists in the yesterday's records based on the
primary key of the column. Here, the problem is sometimes there are
multiple columns having primary keys.

I am receiving primary key columns in a List.

example:

List primaryKeyList = listOfPrimarykeys; // single or multiple
primary key columns

DataFrame currentDataRecords = queryexecutor.getCurrentRecords(); // this
contains today's records
DataFrame yesterdayRecords = queryexecutor.getYesterdayRecords();// this
contains yesterday's records

Can you anyone help me how to join these two dataframes and apply WHERE
conditions on columns dynamically with SPARK-JAVA code.

Thanks
Sushma


Re: Joins in Spark

2017-05-02 Thread Angel Francisco Orta
Sorry, I had a typo I mean repartitionby("fieldofjoin)

El 2 may. 2017 9:44 p. m., "KhajaAsmath Mohammed" 
escribió:

Hi Angel,

I am trying using the below code but i dont see partition on the dataframe.

  val iftaGPSLocation_df = sqlContext.sql(iftaGPSLocQry)
  import sqlContext._
  import sqlContext.implicits._
  datapoint_prq_df.join(geoCacheLoc_df)

Val tableA = DfA.partitionby("joinField").filter("firstSegment")

Columns I have are Lat3,Lon3, VIN, Time  . Lat3 and Lon3 are my join
columns on both dataframes and rest are select columns

Thanks,
Asmath



On Tue, May 2, 2017 at 1:38 PM, Angel Francisco Orta <
angel.francisco.o...@gmail.com> wrote:

> Have you tried to make partition by join's field and run it by segments,
> filtering both tables at the same segments of data?
>
> Example:
>
> Val tableA = DfA.partitionby("joinField").filter("firstSegment")
> Val tableB= DfB.partitionby("joinField").filter("firstSegment")
>
> TableA.join(TableB)
>
> El 2 may. 2017 8:30 p. m., "KhajaAsmath Mohammed" 
> escribió:
>
>> Table 1 (192 GB) is partitioned by year and month ... 192 GB of data is
>> for one month i.e. for April
>>
>> Table 2: 92 GB not partitioned .
>>
>> I have to perform join on  these tables now.
>>
>>
>>
>> On Tue, May 2, 2017 at 1:27 PM, Angel Francisco Orta <
>> angel.francisco.o...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> Is the tables partitioned?
>>> If yes, what is the partition field?
>>>
>>> Thanks
>>>
>>>
>>> El 2 may. 2017 8:22 p. m., "KhajaAsmath Mohammed" <
>>> mdkhajaasm...@gmail.com> escribió:
>>>
>>> Hi,
>>>
>>> I am trying to join two big tables in spark and the job is running for
>>> quite a long time without any results.
>>>
>>> Table 1: 192GB
>>> Table 2: 92 GB
>>>
>>> Does anyone have better solution to get the results fast?
>>>
>>> Thanks,
>>> Asmath
>>>
>>>
>>>
>>


Re: Joins in Spark

2017-05-02 Thread KhajaAsmath Mohammed
Hi Angel,

I am trying using the below code but i dont see partition on the dataframe.

  val iftaGPSLocation_df = sqlContext.sql(iftaGPSLocQry)
  import sqlContext._
  import sqlContext.implicits._
  datapoint_prq_df.join(geoCacheLoc_df)

Val tableA = DfA.partitionby("joinField").filter("firstSegment")

Columns I have are Lat3,Lon3, VIN, Time  . Lat3 and Lon3 are my join
columns on both dataframes and rest are select columns

Thanks,
Asmath



On Tue, May 2, 2017 at 1:38 PM, Angel Francisco Orta <
angel.francisco.o...@gmail.com> wrote:

> Have you tried to make partition by join's field and run it by segments,
> filtering both tables at the same segments of data?
>
> Example:
>
> Val tableA = DfA.partitionby("joinField").filter("firstSegment")
> Val tableB= DfB.partitionby("joinField").filter("firstSegment")
>
> TableA.join(TableB)
>
> El 2 may. 2017 8:30 p. m., "KhajaAsmath Mohammed" 
> escribió:
>
>> Table 1 (192 GB) is partitioned by year and month ... 192 GB of data is
>> for one month i.e. for April
>>
>> Table 2: 92 GB not partitioned .
>>
>> I have to perform join on  these tables now.
>>
>>
>>
>> On Tue, May 2, 2017 at 1:27 PM, Angel Francisco Orta <
>> angel.francisco.o...@gmail.com> wrote:
>>
>>> Hello,
>>>
>>> Is the tables partitioned?
>>> If yes, what is the partition field?
>>>
>>> Thanks
>>>
>>>
>>> El 2 may. 2017 8:22 p. m., "KhajaAsmath Mohammed" <
>>> mdkhajaasm...@gmail.com> escribió:
>>>
>>> Hi,
>>>
>>> I am trying to join two big tables in spark and the job is running for
>>> quite a long time without any results.
>>>
>>> Table 1: 192GB
>>> Table 2: 92 GB
>>>
>>> Does anyone have better solution to get the results fast?
>>>
>>> Thanks,
>>> Asmath
>>>
>>>
>>>
>>


Re: Joins in Spark

2017-05-02 Thread Angel Francisco Orta
Have you tried to make partition by join's field and run it by segments,
filtering both tables at the same segments of data?

Example:

Val tableA = DfA.partitionby("joinField").filter("firstSegment")
Val tableB= DfB.partitionby("joinField").filter("firstSegment")

TableA.join(TableB)

El 2 may. 2017 8:30 p. m., "KhajaAsmath Mohammed" 
escribió:

> Table 1 (192 GB) is partitioned by year and month ... 192 GB of data is
> for one month i.e. for April
>
> Table 2: 92 GB not partitioned .
>
> I have to perform join on  these tables now.
>
>
>
> On Tue, May 2, 2017 at 1:27 PM, Angel Francisco Orta <
> angel.francisco.o...@gmail.com> wrote:
>
>> Hello,
>>
>> Is the tables partitioned?
>> If yes, what is the partition field?
>>
>> Thanks
>>
>>
>> El 2 may. 2017 8:22 p. m., "KhajaAsmath Mohammed" <
>> mdkhajaasm...@gmail.com> escribió:
>>
>> Hi,
>>
>> I am trying to join two big tables in spark and the job is running for
>> quite a long time without any results.
>>
>> Table 1: 192GB
>> Table 2: 92 GB
>>
>> Does anyone have better solution to get the results fast?
>>
>> Thanks,
>> Asmath
>>
>>
>>
>


Re: Joins in Spark

2017-05-02 Thread KhajaAsmath Mohammed
Table 1 (192 GB) is partitioned by year and month ... 192 GB of data is for
one month i.e. for April

Table 2: 92 GB not partitioned .

I have to perform join on  these tables now.



On Tue, May 2, 2017 at 1:27 PM, Angel Francisco Orta <
angel.francisco.o...@gmail.com> wrote:

> Hello,
>
> Is the tables partitioned?
> If yes, what is the partition field?
>
> Thanks
>
>
> El 2 may. 2017 8:22 p. m., "KhajaAsmath Mohammed" 
> escribió:
>
> Hi,
>
> I am trying to join two big tables in spark and the job is running for
> quite a long time without any results.
>
> Table 1: 192GB
> Table 2: 92 GB
>
> Does anyone have better solution to get the results fast?
>
> Thanks,
> Asmath
>
>
>


Re: Joins in Spark

2017-05-02 Thread Angel Francisco Orta
Hello,

Is the tables partitioned?
If yes, what is the partition field?

Thanks


El 2 may. 2017 8:22 p. m., "KhajaAsmath Mohammed" 
escribió:

Hi,

I am trying to join two big tables in spark and the job is running for
quite a long time without any results.

Table 1: 192GB
Table 2: 92 GB

Does anyone have better solution to get the results fast?

Thanks,
Asmath


Joins in Spark

2017-05-02 Thread KhajaAsmath Mohammed
Hi,

I am trying to join two big tables in spark and the job is running for
quite a long time without any results.

Table 1: 192GB
Table 2: 92 GB

Does anyone have better solution to get the results fast?

Thanks,
Asmath


RE: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2

2016-07-21 Thread Ravi Aggarwal
Hi Ian,

Thanks for the information.

I think you are referring to post 
http://apache-spark-user-list.1001560.n3.nabble.com/How-spark-decides-whether-to-do-BroadcastHashJoin-or-SortMergeJoin-td27369.html.

Yeah I could solve above issue of mine using 
spark.sql.autoBroadcastJoinThreshold=-1, so that it always results in 
Sort-Merge join instead of BroadcastHashJoin, Rather ideal fix for me is to 
calculate size of my custom default source (BaseRelation’s sizeInBytes) in 
right manner, to make spark planner take appropriate decision for me.

Thanks
Ravi

From: ianoconn...@gmail.com [mailto:ianoconn...@gmail.com] On Behalf Of Ian 
O'Connell
Sent: Wednesday, July 20, 2016 11:05 PM
To: Ravi Aggarwal <raagg...@adobe.com>
Cc: Ted Yu <yuzhih...@gmail.com>; user <user@spark.apache.org>
Subject: Re: OutOfMemory when doing joins in spark 2.0 while same code runs 
fine in spark 1.5.2

Ravi did your issue ever get solved for this?

I think i've been hitting the same thing, it looks like the 
spark.sql.autoBroadcastJoinThreshold stuff isn't kicking in as expected, if I 
set that to -1 then the computation proceeds successfully.

On Tue, Jun 14, 2016 at 12:28 AM, Ravi Aggarwal 
<raagg...@adobe.com<mailto:raagg...@adobe.com>> wrote:
Hi,

Is there any breakthrough here?

I had one more observation while debugging the issue
Here are the 4 types of data I had:

Da -> stored in parquet
Di -> stored in parquet
Dl1 -> parquet version of lookup
Dl2 -> hbase version of lookup

Joins performed and type of join done by spark:
Da and Di Sort-merge failed (OOM)
Da and Dl1   B-H passed
Da and Dl2   Sort-Mergepassed
Di and Dl1B-H passed
Di and Dl2Sort-Mergefailed (OOM)

From entries I can deduce that problem is with sort-merge join involving Di.
So the hbase thing is out of equation, that is not the culprit.
In physical plan I could see there are only two operations that are done 
additionally in sort-merge as compared to Broadcast-hash.

==> Exchange Hashpartitioning

==> Sort
And finally sort-merge join.

Can we deduce anything from this?

Thanks
Ravi
From: Ravi Aggarwal
Sent: Friday, June 10, 2016 12:31 PM
To: 'Ted Yu' <yuzhih...@gmail.com<mailto:yuzhih...@gmail.com>>
Cc: user <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: RE: OutOfMemory when doing joins in spark 2.0 while same code runs 
fine in spark 1.5.2

Hi Ted,
Thanks for the reply.

Here is the code
Btw – df.count is running fine on dataframe generated from this default source. 
I think it is something in the combination of join and hbase data source that 
is creating issue. But not sure entirely.
I have also dumped the physical plans of both approaches s3a/s3a join and 
s3a/hbase join, In case you want that let me know.

import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat}
import org.apache.hadoop.hbase._
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.execution.datasources.{OutputWriterFactory, 
FileFormat}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.slf4j.LoggerFactory

class DefaultSource extends SchemaRelationProvider with FileFormat {

  override def createRelation(sqlContext: SQLContext, parameters: Map[String, 
String], schema: StructType) = {
new HBaseRelation(schema, parameters)(sqlContext)
  }

  def inferSchema(sparkSession: SparkSession,
  options: Map[String, String],
  files: Seq[FileStatus]): Option[StructType] = ???

  def prepareWrite(sparkSession: SparkSession,
   job: Job,
   options: Map[String, String],
   dataSchema: StructType): OutputWriterFactory = ???
}

object HBaseConfigurationUtil {
  lazy val logger = LoggerFactory.getLogger("HBaseConfigurationUtil")
  val hbaseConfiguration = (tableName: String, hbaseQuorum: String) => {
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, tableName)
conf.set("hbase.mapred.outputtable", tableName)
conf.set("hbase.zookeeper.quorum", hbaseQuorum)
conf
  }
}

class HBaseRelation(val schema: StructType, parameters: Map[String, String])
   (@transient val sqlContext: SQLContext) extends BaseRelation 
with TableScan {

  import sqlContext.sparkContext

  override def buildScan(): RDD[Row] = {

val bcDataSchema = sparkContext.broadcast(schema)

val tableName = parameters.get("path") match 

Re: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2

2016-07-20 Thread Ian O'Connell
Ravi did your issue ever get solved for this?

I think i've been hitting the same thing, it looks like
the spark.sql.autoBroadcastJoinThreshold stuff isn't kicking in as
expected, if I set that to -1 then the computation proceeds successfully.

On Tue, Jun 14, 2016 at 12:28 AM, Ravi Aggarwal <raagg...@adobe.com> wrote:

> Hi,
>
>
>
> Is there any breakthrough here?
>
>
>
> I had one more observation while debugging the issue
>
> Here are the 4 types of data I had:
>
>
>
> Da -> stored in parquet
>
> Di -> stored in parquet
>
> Dl1 -> parquet version of lookup
>
> Dl2 -> hbase version of lookup
>
>
>
> Joins performed and type of join done by spark:
>
> Da and Di Sort-merge failed (OOM)
>
> Da and Dl1   B-H passed
>
> Da and Dl2   Sort-Mergepassed
>
> Di and Dl1B-H passed
>
> Di and Dl2Sort-Mergefailed (OOM)
>
>
>
> From entries I can deduce that problem is with sort-merge join involving
> Di.
>
> So the hbase thing is out of equation, that is not the culprit.
>
> In physical plan I could see there are only two operations that are done
> additionally in sort-merge as compared to Broadcast-hash.
>
> è Exchange Hashpartitioning
>
> è Sort
>
> And finally sort-merge join.
>
>
>
> Can we deduce anything from this?
>
>
>
> Thanks
>
> Ravi
>
> *From:* Ravi Aggarwal
> *Sent:* Friday, June 10, 2016 12:31 PM
> *To:* 'Ted Yu' <yuzhih...@gmail.com>
> *Cc:* user <user@spark.apache.org>
> *Subject:* RE: OutOfMemory when doing joins in spark 2.0 while same code
> runs fine in spark 1.5.2
>
>
>
> Hi Ted,
>
> Thanks for the reply.
>
>
>
> Here is the code
>
> Btw – df.count is running fine on dataframe generated from this default
> source. I think it is something in the combination of join and hbase data
> source that is creating issue. But not sure entirely.
>
> I have also dumped the physical plans of both approaches s3a/s3a join and
> s3a/hbase join, In case you want that let me know.
>
>
>
> import org.apache.hadoop.fs.FileStatus
>
> import org.apache.hadoop.hbase.client._
>
> import org.apache.hadoop.hbase.io.ImmutableBytesWritable
>
> import org.apache.hadoop.hbase.mapreduce.{TableInputFormat}
>
> import org.apache.hadoop.hbase._
>
> import org.apache.hadoop.mapreduce.Job
>
> import org.apache.spark.rdd.RDD
>
> import org.apache.spark.sql.Row
>
> import org.apache.spark.sql.catalyst.CatalystTypeConverters
>
> import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
>
> import org.apache.spark.sql.execution.datasources.{OutputWriterFactory,
> FileFormat}
>
> import org.apache.spark.sql.sources._
>
> import org.apache.spark.sql.types._
>
> import org.apache.spark.sql._
>
> import org.slf4j.LoggerFactory
>
>
>
> class DefaultSource extends SchemaRelationProvider with FileFormat {
>
>
>
>   override def createRelation(sqlContext: SQLContext, parameters:
> Map[String, String], schema: StructType) = {
>
> new HBaseRelation(schema, parameters)(sqlContext)
>
>   }
>
>
>
>   def inferSchema(sparkSession: SparkSession,
>
>   options: Map[String, String],
>
>   files: Seq[FileStatus]): Option[StructType] = ???
>
>
>
>   def prepareWrite(sparkSession: SparkSession,
>
>job: Job,
>
>options: Map[String, String],
>
>dataSchema: StructType): OutputWriterFactory = ???
>
> }
>
>
>
> object HBaseConfigurationUtil {
>
>   lazy val logger = LoggerFactory.getLogger("HBaseConfigurationUtil")
>
>   val hbaseConfiguration = (tableName: String, hbaseQuorum: String) => {
>
> val conf = HBaseConfiguration.create()
>
> conf.set(TableInputFormat.INPUT_TABLE, tableName)
>
> conf.set("hbase.mapred.outputtable", tableName)
>
> conf.set("hbase.zookeeper.quorum", hbaseQuorum)
>
> conf
>
>   }
>
> }
>
>
>
> class HBaseRelation(val schema: StructType, parameters: Map[String,
> String])
>
>(@transient val sqlContext: SQLContext) extends
> BaseRelation with TableScan {
>
>
>
>   import sqlContext.sparkContext
>
>
>
>   override def buildScan(): RDD[Row] = {
>
>
>
> val bcDataSchema = sparkContext.broadcast(schema)
>
>
>
> val tableName = parameters.get("path") match {
>
>   case Some(t) => t
>
>  

RE: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2

2016-06-14 Thread Ravi Aggarwal
Hi,

Is there any breakthrough here?

I had one more observation while debugging the issue
Here are the 4 types of data I had:

Da -> stored in parquet
Di -> stored in parquet
Dl1 -> parquet version of lookup
Dl2 -> hbase version of lookup

Joins performed and type of join done by spark:
Da and Di Sort-merge failed (OOM)
Da and Dl1   B-H passed
Da and Dl2   Sort-Mergepassed
Di and Dl1B-H passed
Di and Dl2Sort-Mergefailed (OOM)

From entries I can deduce that problem is with sort-merge join involving Di.
So the hbase thing is out of equation, that is not the culprit.
In physical plan I could see there are only two operations that are done 
additionally in sort-merge as compared to Broadcast-hash.

è Exchange Hashpartitioning

è Sort
And finally sort-merge join.

Can we deduce anything from this?

Thanks
Ravi
From: Ravi Aggarwal
Sent: Friday, June 10, 2016 12:31 PM
To: 'Ted Yu' <yuzhih...@gmail.com>
Cc: user <user@spark.apache.org>
Subject: RE: OutOfMemory when doing joins in spark 2.0 while same code runs 
fine in spark 1.5.2

Hi Ted,
Thanks for the reply.

Here is the code
Btw – df.count is running fine on dataframe generated from this default source. 
I think it is something in the combination of join and hbase data source that 
is creating issue. But not sure entirely.
I have also dumped the physical plans of both approaches s3a/s3a join and 
s3a/hbase join, In case you want that let me know.

import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat}
import org.apache.hadoop.hbase._
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.execution.datasources.{OutputWriterFactory, 
FileFormat}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.slf4j.LoggerFactory

class DefaultSource extends SchemaRelationProvider with FileFormat {

  override def createRelation(sqlContext: SQLContext, parameters: Map[String, 
String], schema: StructType) = {
new HBaseRelation(schema, parameters)(sqlContext)
  }

  def inferSchema(sparkSession: SparkSession,
  options: Map[String, String],
  files: Seq[FileStatus]): Option[StructType] = ???

  def prepareWrite(sparkSession: SparkSession,
   job: Job,
   options: Map[String, String],
   dataSchema: StructType): OutputWriterFactory = ???
}

object HBaseConfigurationUtil {
  lazy val logger = LoggerFactory.getLogger("HBaseConfigurationUtil")
  val hbaseConfiguration = (tableName: String, hbaseQuorum: String) => {
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, tableName)
conf.set("hbase.mapred.outputtable", tableName)
conf.set("hbase.zookeeper.quorum", hbaseQuorum)
conf
  }
}

class HBaseRelation(val schema: StructType, parameters: Map[String, String])
   (@transient val sqlContext: SQLContext) extends BaseRelation 
with TableScan {

  import sqlContext.sparkContext

  override def buildScan(): RDD[Row] = {

val bcDataSchema = sparkContext.broadcast(schema)

val tableName = parameters.get("path") match {
  case Some(t) => t
  case _ => throw new RuntimeException("Table name (path) not provided in 
parameters")
}

val hbaseQuorum = parameters.get("hbaseQuorum") match {
  case Some(s: String) => s
  case _ => throw new RuntimeException("hbaseQuorum not provided in 
options")
}

val rdd = sparkContext.newAPIHadoopRDD(
  HBaseConfigurationUtil.hbaseConfiguration(tableName, hbaseQuorum),
  classOf[TableInputFormat],
  classOf[ImmutableBytesWritable],
  classOf[Result]
)

val rowRdd = rdd
  .map(tuple => tuple._2)
  .map { record =>

  val cells: java.util.List[Cell] = record.listCells()

  val splitRec = 
cells.toArray.foldLeft(Array(CellUtil.cloneRow(cells.get(0 {(a, b) =>
a :+ CellUtil.cloneValue(b.asInstanceOf[Cell])
  }

  val keyFieldName = bcDataSchema.value.fields.filter(e => 
e.metadata.contains("isPrimary") && e.metadata.getBoolean("isPrimary"))(0).name

  val schemaArr = cells.toArray.foldLeft(Array(keyFieldName)) {(a, b) => {
val fieldCell = b.asInstanceOf[Cell]
a :+ new 
String(fieldCell.getQualifierArray).substring(fieldCell.getQualifierOffset, 
fieldCell.getQualifierLength + fieldCell.getQualifierOffset)
  

RE: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2

2016-06-10 Thread Ravi Aggarwal
Hi Ted,
Thanks for the reply.

Here is the code
Btw – df.count is running fine on dataframe generated from this default source. 
I think it is something in the combination of join and hbase data source that 
is creating issue. But not sure entirely.
I have also dumped the physical plans of both approaches s3a/s3a join and 
s3a/hbase join, In case you want that let me know.

import org.apache.hadoop.fs.FileStatus
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
import org.apache.hadoop.hbase.mapreduce.{TableInputFormat}
import org.apache.hadoop.hbase._
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.CatalystTypeConverters
import org.apache.spark.sql.catalyst.expressions.{Cast, Literal}
import org.apache.spark.sql.execution.datasources.{OutputWriterFactory, 
FileFormat}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.slf4j.LoggerFactory

class DefaultSource extends SchemaRelationProvider with FileFormat {

  override def createRelation(sqlContext: SQLContext, parameters: Map[String, 
String], schema: StructType) = {
new HBaseRelation(schema, parameters)(sqlContext)
  }

  def inferSchema(sparkSession: SparkSession,
  options: Map[String, String],
  files: Seq[FileStatus]): Option[StructType] = ???

  def prepareWrite(sparkSession: SparkSession,
   job: Job,
   options: Map[String, String],
   dataSchema: StructType): OutputWriterFactory = ???
}

object HBaseConfigurationUtil {
  lazy val logger = LoggerFactory.getLogger("HBaseConfigurationUtil")
  val hbaseConfiguration = (tableName: String, hbaseQuorum: String) => {
val conf = HBaseConfiguration.create()
conf.set(TableInputFormat.INPUT_TABLE, tableName)
conf.set("hbase.mapred.outputtable", tableName)
conf.set("hbase.zookeeper.quorum", hbaseQuorum)
conf
  }
}

class HBaseRelation(val schema: StructType, parameters: Map[String, String])
   (@transient val sqlContext: SQLContext) extends BaseRelation 
with TableScan {

  import sqlContext.sparkContext

  override def buildScan(): RDD[Row] = {

val bcDataSchema = sparkContext.broadcast(schema)

val tableName = parameters.get("path") match {
  case Some(t) => t
  case _ => throw new RuntimeException("Table name (path) not provided in 
parameters")
}

val hbaseQuorum = parameters.get("hbaseQuorum") match {
  case Some(s: String) => s
  case _ => throw new RuntimeException("hbaseQuorum not provided in 
options")
}

val rdd = sparkContext.newAPIHadoopRDD(
  HBaseConfigurationUtil.hbaseConfiguration(tableName, hbaseQuorum),
  classOf[TableInputFormat],
  classOf[ImmutableBytesWritable],
  classOf[Result]
)

val rowRdd = rdd
  .map(tuple => tuple._2)
  .map { record =>

  val cells: java.util.List[Cell] = record.listCells()

  val splitRec = 
cells.toArray.foldLeft(Array(CellUtil.cloneRow(cells.get(0 {(a, b) =>
a :+ CellUtil.cloneValue(b.asInstanceOf[Cell])
  }

  val keyFieldName = bcDataSchema.value.fields.filter(e => 
e.metadata.contains("isPrimary") && e.metadata.getBoolean("isPrimary"))(0).name

  val schemaArr = cells.toArray.foldLeft(Array(keyFieldName)) {(a, b) => {
val fieldCell = b.asInstanceOf[Cell]
a :+ new 
String(fieldCell.getQualifierArray).substring(fieldCell.getQualifierOffset, 
fieldCell.getQualifierLength + fieldCell.getQualifierOffset)
  }
  }

  val res = Map(schemaArr.zip(splitRec).toArray: _*)

  val recordFields = res.map(value => {
val colDataType =
  try {
bcDataSchema.value.fields.filter(_.name == value._1)(0).dataType
  } catch {
case e: ArrayIndexOutOfBoundsException => throw new 
RuntimeException("Schema doesn't contain the fieldname")
  }
CatalystTypeConverters.convertToScala(
  Cast(Literal(value._2), colDataType).eval(),
  colDataType)
  }).toArray
  Row(recordFields: _*)
}

rowRdd
  }
}

Thanks
Ravi

From: Ted Yu [mailto:yuzhih...@gmail.com]
Sent: Thursday, June 9, 2016 7:56 PM
To: Ravi Aggarwal <raagg...@adobe.com>
Cc: user <user@spark.apache.org>
Subject: Re: OutOfMemory when doing joins in spark 2.0 while same code runs 
fine in spark 1.5.2

bq. Read data from hbase using custom DefaultSource (implemented using 
TableScan)

Did you use the DefaultSource from hbase-spark module in hbase master branch ?
If you wrote your own, mind sharing related code ?

Thanks

On Thu, Jun 9, 2016 at 2:53 AM, raaggarw 
<raagg...@adobe.com<mailto:raagg...@adobe.com>> wrote:
Hi,

I was 

Re: OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2

2016-06-09 Thread Ted Yu
bq. Read data from hbase using custom DefaultSource (implemented using
TableScan)

Did you use the DefaultSource from hbase-spark module in hbase master
branch ?
If you wrote your own, mind sharing related code ?

Thanks

On Thu, Jun 9, 2016 at 2:53 AM, raaggarw <raagg...@adobe.com> wrote:

> Hi,
>
> I was trying to port my code from spark 1.5.2 to spark 2.0 however i faced
> some outofMemory issues. On drilling down i could see that OOM is because
> of
> join, because removing join fixes the issue. I then created a small
> spark-app to reproduce this:
>
> (48 cores, 300gb ram - divided among 4 workers)
>
> line1 :df1 = Read a set a of parquet files into dataframe
> line2: df1.count
> line3: df2 = Read data from hbase using custom DefaultSource (implemented
> using TableScan)
> line4: df2.count
> line5: df3 = df1.join(df2, df1("field1") === df2("field2"), "inner")
> line6: df3.count -> *this is where it fails in Spark 2.0 and runs fine in
> spark 1.5.2*
>
> Problem:
> First lot of WARN messages
> 2016-06-09 08:14:18,884 WARN  [broadcast-exchange-0]
> memory.TaskMemoryManager (TaskMemoryManager.java:allocatePage(264)) -
> Failed
> to allocate a page (1048576 bytes), try again.
> And then OOM
>
> I then tried to dump data fetched from hbase into s3 and then created df2
> from s3 rather than hbase, then it worked fine in spark 2.0 as well.
>
> Could someone please guide me through next steps?
>
> Thanks
> Ravi
> Computer Scientist @ Adobe
>
>
>
>
> --
> View this message in context:
> http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-when-doing-joins-in-spark-2-0-while-same-code-runs-fine-in-spark-1-5-2-tp27124.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> -
> To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
> For additional commands, e-mail: user-h...@spark.apache.org
>
>


OutOfMemory when doing joins in spark 2.0 while same code runs fine in spark 1.5.2

2016-06-09 Thread raaggarw
Hi,

I was trying to port my code from spark 1.5.2 to spark 2.0 however i faced
some outofMemory issues. On drilling down i could see that OOM is because of
join, because removing join fixes the issue. I then created a small
spark-app to reproduce this:

(48 cores, 300gb ram - divided among 4 workers)

line1 :df1 = Read a set a of parquet files into dataframe
line2: df1.count
line3: df2 = Read data from hbase using custom DefaultSource (implemented
using TableScan)
line4: df2.count
line5: df3 = df1.join(df2, df1("field1") === df2("field2"), "inner")
line6: df3.count -> *this is where it fails in Spark 2.0 and runs fine in
spark 1.5.2*

Problem:
First lot of WARN messages
2016-06-09 08:14:18,884 WARN  [broadcast-exchange-0]
memory.TaskMemoryManager (TaskMemoryManager.java:allocatePage(264)) - Failed
to allocate a page (1048576 bytes), try again.
And then OOM

I then tried to dump data fetched from hbase into s3 and then created df2
from s3 rather than hbase, then it worked fine in spark 2.0 as well.

Could someone please guide me through next steps?

Thanks
Ravi
Computer Scientist @ Adobe




--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/OutOfMemory-when-doing-joins-in-spark-2-0-while-same-code-runs-fine-in-spark-1-5-2-tp27124.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Joins in Spark

2016-03-19 Thread Stuti Awasthi
Hi All,

I have to join 2 files both not very big say few MBs only but the result can be 
huge say generating 500GBs to TBs of data.  Now I have tried using spark Join() 
function but Im noticing that join is executing on only 1 or 2 nodes at the 
max. Since I have a cluster size of 5 nodes , I tried to pass 
"join(otherDataset, [numTasks])" as numTasks=10 but again what I noticed that 
all the 9 tasks are finished instantly and only 1 executor is processing all 
the data.

I searched on internet and got that we can use Broadcast variable to send data 
from 1 file to all nodes and then use map function to do the join. In this way 
I should be able to run multiple task on different executors.
Now my question is , since Spark is providing the Join functionality, I have 
assumed that it will handle the data parallelism automatically. Now is Spark 
provide some functionality which I can directly use for join rather than 
implementing Mapside join using Broadcast on my own or any other better way is 
also welcome.

I assume that this might be very common problem for all and looking out for 
suggestions.

Thanks 
Stuti Awasthi



::DISCLAIMER::


The contents of this e-mail and any attachment(s) are confidential and intended 
for the named recipient(s) only.
E-mail transmission is not guaranteed to be secure or error-free as information 
could be intercepted, corrupted,
lost, destroyed, arrive late or incomplete, or may contain viruses in 
transmission. The e mail and its contents
(with or without referred errors) shall therefore not attach any liability on 
the originator or HCL or its affiliates.
Views or opinions, if any, presented in this email are solely those of the 
author and may not necessarily reflect the
views or opinions of HCL or its affiliates. Any form of reproduction, 
dissemination, copying, disclosure, modification,
distribution and / or publication of this message without the prior written 
consent of authorized representative of
HCL is strictly prohibited. If you have received this email in error please 
delete it and notify the sender immediately.
Before opening any email and/or attachments, please check them for viruses and 
other defects.




Re: Joins in Spark

2016-03-19 Thread Rishi Mishra
My suspect is your input file partitions are small. Hence small number of
tasks are started.  Can you provide some more details like how you load the
files and how  the result size is around 500GBs ?

Regards,
Rishitesh Mishra,
SnappyData . (http://www.snappydata.io/)

https://in.linkedin.com/in/rishiteshmishra

On Thu, Mar 17, 2016 at 12:12 PM, Stuti Awasthi 
wrote:

> Hi All,
>
>
>
> I have to join 2 files both not very big say few MBs only but the result
> can be huge say generating 500GBs to TBs of data.  Now I have tried using
> spark Join() function but Im noticing that join is executing on only 1 or 2
> nodes at the max. Since I have a cluster size of 5 nodes , I tried to pass “
> join(*otherDataset*, [*numTasks*])” as numTasks=10 but again what I
> noticed that all the 9 tasks are finished instantly and only 1 executor is
> processing all the data.
>
>
>
> I searched on internet and got that we can use Broadcast variable to send
> data from 1 file to all nodes and then use map function to do the join. In
> this way I should be able to run multiple task on different executors.
>
> Now my question is , since Spark is providing the Join functionality, I
> have assumed that it will handle the data parallelism automatically. Now is
> Spark provide some functionality which I can directly use for join rather
> than implementing Mapside join using Broadcast on my own or any other
> better way is also welcome.
>
>
>
> I assume that this might be very common problem for all and looking out
> for suggestions.
>
>
>
> Thanks 
>
> Stuti Awasthi
>
>
>
>
>
> ::DISCLAIMER::
>
> 
>
> The contents of this e-mail and any attachment(s) are confidential and
> intended for the named recipient(s) only.
> E-mail transmission is not guaranteed to be secure or error-free as
> information could be intercepted, corrupted,
> lost, destroyed, arrive late or incomplete, or may contain viruses in
> transmission. The e mail and its contents
> (with or without referred errors) shall therefore not attach any liability
> on the originator or HCL or its affiliates.
> Views or opinions, if any, presented in this email are solely those of the
> author and may not necessarily reflect the
> views or opinions of HCL or its affiliates. Any form of reproduction,
> dissemination, copying, disclosure, modification,
> distribution and / or publication of this message without the prior
> written consent of authorized representative of
> HCL is strictly prohibited. If you have received this email in error
> please delete it and notify the sender immediately.
> Before opening any email and/or attachments, please check them for viruses
> and other defects.
>
>
> 
>


Re: Multiple joins in Spark

2015-10-20 Thread Xiao Li
Are you using hiveContext?

First, build your Spark using the following command:
mvn -Pyarn -Phadoop-2.4 -Dhadoop.version=2.4.0 -Phive -Phive-thriftserver
-DskipTests clean package

Then, try this sample program

object SimpleApp {
  case class Individual(name: String, surname: String, birthDate: String)

  def main(args: Array[String]) {
val sc = new SparkContext("local", "join DFs")
//val sqlContext = new SQLContext(sc)
val hiveContext = new org.apache.spark.sql.hive.HiveContext(sc)

val rdd = sc.parallelize(Seq(
  Individual("a", "c", "10/10/1972"),
  Individual("b", "d", "10/11/1970"),
))

val df = hiveContext.createDataFrame(rdd)

df.registerTempTable("tab")

val dfHive = hiveContext.sql("select * from tab")

dfHive.show()
  }
}


2015-10-20 6:24 GMT-07:00 Shyam Parimal Katti :

> When I do the steps above and run a query like this:
>
> sqlContext.sql("select * from ...")
>
> I get exception:
>
> org.apache.spark.sql.AnalysisException: Non-local session path expected to
> be non-null;
>at org.apache.spark.sql.hive.HiveQL$.createPlan(HiveQl.scala:260)
>.
>
> I cannot paste the entire stack since it's on a company laptop and I am
> not allowed to copy paste things! Though if absolutely needed to help, I
> can figure out some way to provide it.
>
> On Sat, Oct 17, 2015 at 1:13 AM, Xiao Li  wrote:
>
>> Hi, Shyam,
>>
>> The method registerTempTable is to register a [DataFrame as a temporary
>> table in the Catalog using the given table name.
>>
>> In the Catalog, Spark maintains a concurrent hashmap, which contains the
>> pair of the table names and the logical plan.
>>
>> For example, when we submit the following query,
>>
>> SELECT * FROM inMemoryDF
>>
>> The concurrent hashmap contains one map from name to the Logical Plan:
>>
>> "inMemoryDF" -> "LogicalRDD [c1#0,c2#1,c3#2,c4#3], MapPartitionsRDD[1] at
>> createDataFrame at SimpleApp.scala:42
>>
>> Therefore, using SQL will not hurt your performance. The actual physical
>> plan to execute your SQL query is generated by the result of Catalyst
>> optimizer.
>>
>> Good luck,
>>
>> Xiao Li
>>
>>
>>
>> 2015-10-16 20:53 GMT-07:00 Shyam Parimal Katti :
>>
>>> Thanks Xiao! Question about the internals, would you know what happens
>>> when createTempTable() is called? I. E.  Does it create an RDD internally
>>> or some internal representation that lets it join with  an RDD?
>>>
>>> Again, thanks for the answer.
>>> On Oct 16, 2015 8:15 PM, "Xiao Li"  wrote:
>>>
 Hi, Shyam,

 You still can use SQL to do the same thing in Spark:

 For example,

 val df1 = sqlContext.createDataFrame(rdd)
 val df2 = sqlContext.createDataFrame(rdd2)
 val df3 = sqlContext.createDataFrame(rdd3)
 df1.registerTempTable("tab1")
 df2.registerTempTable("tab2")
 df3.registerTempTable("tab3")
 val exampleSQL = sqlContext.sql("select * from tab1, tab2, tab3
 where tab1.name = tab2.name and tab2.id = tab3.id")

 Good luck,

 Xiao Li

 2015-10-16 17:01 GMT-07:00 Shyam Parimal Katti :

> Hello All,
>
> I have a following SQL query like this:
>
> select a.a_id, b.b_id, c.c_id from table_a a join table_b b on a.a_id
> = b.a_id join table_c c on b.b_id = c.b_id
>
> In scala i have done this so far:
>
> table_a_rdd = sc.textFile(...)
> table_b_rdd = sc.textFile(...)
> table_c_rdd = sc.textFile(...)
>
> val table_a_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
> (line(0), line))
> val table_b_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
> (line(0), line))
> val table_c_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
> (line(0), line))
>
> Each line has the first value at its primary key.
>
> While I can join 2 RDDs using table_a_rowRDD.join(table_b_rowRDD) to
> join, is it possible to join multiple RDDs in a single expression? like
> table_a_rowRDD.join(table_b_rowRDD).join(table_c_rowRDD) ? Also, how can I
> specify the column on which I can join multiple RDDs?
>
>
>
>
>

>>
>


Re: Multiple joins in Spark

2015-10-20 Thread Shyam Parimal Katti
When I do the steps above and run a query like this:

sqlContext.sql("select * from ...")

I get exception:

org.apache.spark.sql.AnalysisException: Non-local session path expected to
be non-null;
   at org.apache.spark.sql.hive.HiveQL$.createPlan(HiveQl.scala:260)
   .

I cannot paste the entire stack since it's on a company laptop and I am not
allowed to copy paste things! Though if absolutely needed to help, I can
figure out some way to provide it.

On Sat, Oct 17, 2015 at 1:13 AM, Xiao Li  wrote:

> Hi, Shyam,
>
> The method registerTempTable is to register a [DataFrame as a temporary
> table in the Catalog using the given table name.
>
> In the Catalog, Spark maintains a concurrent hashmap, which contains the
> pair of the table names and the logical plan.
>
> For example, when we submit the following query,
>
> SELECT * FROM inMemoryDF
>
> The concurrent hashmap contains one map from name to the Logical Plan:
>
> "inMemoryDF" -> "LogicalRDD [c1#0,c2#1,c3#2,c4#3], MapPartitionsRDD[1] at
> createDataFrame at SimpleApp.scala:42
>
> Therefore, using SQL will not hurt your performance. The actual physical
> plan to execute your SQL query is generated by the result of Catalyst
> optimizer.
>
> Good luck,
>
> Xiao Li
>
>
>
> 2015-10-16 20:53 GMT-07:00 Shyam Parimal Katti :
>
>> Thanks Xiao! Question about the internals, would you know what happens
>> when createTempTable() is called? I. E.  Does it create an RDD internally
>> or some internal representation that lets it join with  an RDD?
>>
>> Again, thanks for the answer.
>> On Oct 16, 2015 8:15 PM, "Xiao Li"  wrote:
>>
>>> Hi, Shyam,
>>>
>>> You still can use SQL to do the same thing in Spark:
>>>
>>> For example,
>>>
>>> val df1 = sqlContext.createDataFrame(rdd)
>>> val df2 = sqlContext.createDataFrame(rdd2)
>>> val df3 = sqlContext.createDataFrame(rdd3)
>>> df1.registerTempTable("tab1")
>>> df2.registerTempTable("tab2")
>>> df3.registerTempTable("tab3")
>>> val exampleSQL = sqlContext.sql("select * from tab1, tab2, tab3
>>> where tab1.name = tab2.name and tab2.id = tab3.id")
>>>
>>> Good luck,
>>>
>>> Xiao Li
>>>
>>> 2015-10-16 17:01 GMT-07:00 Shyam Parimal Katti :
>>>
 Hello All,

 I have a following SQL query like this:

 select a.a_id, b.b_id, c.c_id from table_a a join table_b b on a.a_id =
 b.a_id join table_c c on b.b_id = c.b_id

 In scala i have done this so far:

 table_a_rdd = sc.textFile(...)
 table_b_rdd = sc.textFile(...)
 table_c_rdd = sc.textFile(...)

 val table_a_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
 (line(0), line))
 val table_b_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
 (line(0), line))
 val table_c_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
 (line(0), line))

 Each line has the first value at its primary key.

 While I can join 2 RDDs using table_a_rowRDD.join(table_b_rowRDD) to
 join, is it possible to join multiple RDDs in a single expression? like
 table_a_rowRDD.join(table_b_rowRDD).join(table_c_rowRDD) ? Also, how can I
 specify the column on which I can join multiple RDDs?





>>>
>


Multiple joins in Spark

2015-10-16 Thread Shyam Parimal Katti
Hello All,

I have a following SQL query like this:

select a.a_id, b.b_id, c.c_id from table_a a join table_b b on a.a_id =
b.a_id join table_c c on b.b_id = c.b_id

In scala i have done this so far:

table_a_rdd = sc.textFile(...)
table_b_rdd = sc.textFile(...)
table_c_rdd = sc.textFile(...)

val table_a_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
(line(0), line))
val table_b_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
(line(0), line))
val table_c_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
(line(0), line))

Each line has the first value at its primary key.

While I can join 2 RDDs using table_a_rowRDD.join(table_b_rowRDD) to join,
is it possible to join multiple RDDs in a single expression? like
table_a_rowRDD.join(table_b_rowRDD).join(table_c_rowRDD) ? Also, how can I
specify the column on which I can join multiple RDDs?


Re: Multiple joins in Spark

2015-10-16 Thread Xiao Li
Hi, Shyam,

You still can use SQL to do the same thing in Spark:

For example,

val df1 = sqlContext.createDataFrame(rdd)
val df2 = sqlContext.createDataFrame(rdd2)
val df3 = sqlContext.createDataFrame(rdd3)
df1.registerTempTable("tab1")
df2.registerTempTable("tab2")
df3.registerTempTable("tab3")
val exampleSQL = sqlContext.sql("select * from tab1, tab2, tab3 where
tab1.name = tab2.name and tab2.id = tab3.id")

Good luck,

Xiao Li

2015-10-16 17:01 GMT-07:00 Shyam Parimal Katti :

> Hello All,
>
> I have a following SQL query like this:
>
> select a.a_id, b.b_id, c.c_id from table_a a join table_b b on a.a_id =
> b.a_id join table_c c on b.b_id = c.b_id
>
> In scala i have done this so far:
>
> table_a_rdd = sc.textFile(...)
> table_b_rdd = sc.textFile(...)
> table_c_rdd = sc.textFile(...)
>
> val table_a_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
> (line(0), line))
> val table_b_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
> (line(0), line))
> val table_c_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
> (line(0), line))
>
> Each line has the first value at its primary key.
>
> While I can join 2 RDDs using table_a_rowRDD.join(table_b_rowRDD) to join,
> is it possible to join multiple RDDs in a single expression? like
> table_a_rowRDD.join(table_b_rowRDD).join(table_c_rowRDD) ? Also, how can I
> specify the column on which I can join multiple RDDs?
>
>
>
>
>


Re: Multiple joins in Spark

2015-10-16 Thread Xiao Li
Hi, Shyam,

The method registerTempTable is to register a [DataFrame as a temporary
table in the Catalog using the given table name.

In the Catalog, Spark maintains a concurrent hashmap, which contains the
pair of the table names and the logical plan.

For example, when we submit the following query,

SELECT * FROM inMemoryDF

The concurrent hashmap contains one map from name to the Logical Plan:

"inMemoryDF" -> "LogicalRDD [c1#0,c2#1,c3#2,c4#3], MapPartitionsRDD[1] at
createDataFrame at SimpleApp.scala:42

Therefore, using SQL will not hurt your performance. The actual physical
plan to execute your SQL query is generated by the result of Catalyst
optimizer.

Good luck,

Xiao Li



2015-10-16 20:53 GMT-07:00 Shyam Parimal Katti :

> Thanks Xiao! Question about the internals, would you know what happens
> when createTempTable() is called? I. E.  Does it create an RDD internally
> or some internal representation that lets it join with  an RDD?
>
> Again, thanks for the answer.
> On Oct 16, 2015 8:15 PM, "Xiao Li"  wrote:
>
>> Hi, Shyam,
>>
>> You still can use SQL to do the same thing in Spark:
>>
>> For example,
>>
>> val df1 = sqlContext.createDataFrame(rdd)
>> val df2 = sqlContext.createDataFrame(rdd2)
>> val df3 = sqlContext.createDataFrame(rdd3)
>> df1.registerTempTable("tab1")
>> df2.registerTempTable("tab2")
>> df3.registerTempTable("tab3")
>> val exampleSQL = sqlContext.sql("select * from tab1, tab2, tab3 where
>> tab1.name = tab2.name and tab2.id = tab3.id")
>>
>> Good luck,
>>
>> Xiao Li
>>
>> 2015-10-16 17:01 GMT-07:00 Shyam Parimal Katti :
>>
>>> Hello All,
>>>
>>> I have a following SQL query like this:
>>>
>>> select a.a_id, b.b_id, c.c_id from table_a a join table_b b on a.a_id =
>>> b.a_id join table_c c on b.b_id = c.b_id
>>>
>>> In scala i have done this so far:
>>>
>>> table_a_rdd = sc.textFile(...)
>>> table_b_rdd = sc.textFile(...)
>>> table_c_rdd = sc.textFile(...)
>>>
>>> val table_a_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
>>> (line(0), line))
>>> val table_b_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
>>> (line(0), line))
>>> val table_c_rowRDD = table_a_rdd.map(_.split("\\x07")).map(line =>
>>> (line(0), line))
>>>
>>> Each line has the first value at its primary key.
>>>
>>> While I can join 2 RDDs using table_a_rowRDD.join(table_b_rowRDD) to
>>> join, is it possible to join multiple RDDs in a single expression? like
>>> table_a_rowRDD.join(table_b_rowRDD).join(table_c_rowRDD) ? Also, how can I
>>> specify the column on which I can join multiple RDDs?
>>>
>>>
>>>
>>>
>>>
>>


Re: Support for skewed joins in Spark

2015-05-04 Thread ๏̯͡๏
Hello Soila,
Can you share the code that shows usuag of RangePartitioner ?
I am facing issue with .join() where one task runs forever. I tried
repartition(100/200/300/1200) and it did not help, I cannot use map-side
join because both datasets are huge and beyond driver memory size.
Regards,
Deepak

On Fri, Mar 13, 2015 at 9:54 AM, Soila Pertet Kavulya skavu...@gmail.com
wrote:

 Thanks Shixiong,

 I'll try out your PR. Do you know what the status of the PR is? Are
 there any plans to incorporate this change to the
 DataFrames/SchemaRDDs in Spark 1.3?

 Soila

 On Thu, Mar 12, 2015 at 7:52 PM, Shixiong Zhu zsxw...@gmail.com wrote:
  I sent a PR to add skewed join last year:
  https://github.com/apache/spark/pull/3505
  However, it does not split a key to multiple partitions. Instead, if a
 key
  has too many values that can not be fit in to memory, it will store the
  values into the disk temporarily and use disk files to do the join.
 
  Best Regards,
 
  Shixiong Zhu
 
  2015-03-13 9:37 GMT+08:00 Soila Pertet Kavulya skavu...@gmail.com:
 
  Does Spark support skewed joins similar to Pig which distributes large
  keys over multiple partitions? I tried using the RangePartitioner but
  I am still experiencing failures because some keys are too large to
  fit in a single partition. I cannot use broadcast variables to
  work-around this because both RDDs are too large to fit in driver
  memory.
 
  -
  To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
  For additional commands, e-mail: user-h...@spark.apache.org
 
 

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




-- 
Deepak


Re: Support for skewed joins in Spark

2015-03-12 Thread Shixiong Zhu
I sent a PR to add skewed join last year:
https://github.com/apache/spark/pull/3505
However, it does not split a key to multiple partitions. Instead, if a key
has too many values that can not be fit in to memory, it will store the
values into the disk temporarily and use disk files to do the join.

Best Regards,
Shixiong Zhu

2015-03-13 9:37 GMT+08:00 Soila Pertet Kavulya skavu...@gmail.com:

 Does Spark support skewed joins similar to Pig which distributes large
 keys over multiple partitions? I tried using the RangePartitioner but
 I am still experiencing failures because some keys are too large to
 fit in a single partition. I cannot use broadcast variables to
 work-around this because both RDDs are too large to fit in driver
 memory.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org




Re: Support for skewed joins in Spark

2015-03-12 Thread Soila Pertet Kavulya
Thanks Shixiong,

I'll try out your PR. Do you know what the status of the PR is? Are
there any plans to incorporate this change to the
DataFrames/SchemaRDDs in Spark 1.3?

Soila

On Thu, Mar 12, 2015 at 7:52 PM, Shixiong Zhu zsxw...@gmail.com wrote:
 I sent a PR to add skewed join last year:
 https://github.com/apache/spark/pull/3505
 However, it does not split a key to multiple partitions. Instead, if a key
 has too many values that can not be fit in to memory, it will store the
 values into the disk temporarily and use disk files to do the join.

 Best Regards,

 Shixiong Zhu

 2015-03-13 9:37 GMT+08:00 Soila Pertet Kavulya skavu...@gmail.com:

 Does Spark support skewed joins similar to Pig which distributes large
 keys over multiple partitions? I tried using the RangePartitioner but
 I am still experiencing failures because some keys are too large to
 fit in a single partition. I cannot use broadcast variables to
 work-around this because both RDDs are too large to fit in driver
 memory.

 -
 To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
 For additional commands, e-mail: user-h...@spark.apache.org



-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Support for skewed joins in Spark

2015-03-12 Thread Soila Pertet Kavulya
Does Spark support skewed joins similar to Pig which distributes large
keys over multiple partitions? I tried using the RangePartitioner but
I am still experiencing failures because some keys are too large to
fit in a single partition. I cannot use broadcast variables to
work-around this because both RDDs are too large to fit in driver
memory.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Joins in Spark

2014-12-22 Thread Deep Pradhan
Hi,
I have two RDDs, vertices and edges. Vertices is an RDD and edges is a pair
RDD. I want to take three way join of these two. Joins work only when both
the RDDs are pair RDDS right? So, how am I supposed to take a three way
join of these RDDs?

Thank You


Joins in Spark

2014-12-22 Thread Deep Pradhan
Hi,
I have two RDDs, vertices and edges. Vertices is an RDD and edges is a pair
RDD. I want to take three way join of these two. Joins work only when both
the RDDs are pair RDDS right? So, how am I supposed to take a three way
join of these RDDs?

Thank You


Re: Joins in Spark

2014-12-22 Thread madhu phatak
Hi,
 You can map your vertices rdd as follow

val pairVertices = verticesRDD.map(vertice = (vertice,null))

the above gives you a pairRDD. After join make sure that you remove
superfluous null value.

On Tue, Dec 23, 2014 at 10:36 AM, Deep Pradhan pradhandeep1...@gmail.com
wrote:

 Hi,
 I have two RDDs, vertices and edges. Vertices is an RDD and edges is a
 pair RDD. I want to take three way join of these two. Joins work only when
 both the RDDs are pair RDDS right? So, how am I supposed to take a three
 way join of these RDDs?

 Thank You




-- 
Regards,
Madhukara Phatak
http://www.madhukaraphatak.com


Joins in Spark

2014-12-22 Thread pradhandeep
Hi,
I have two RDDs, veritces which is an RDD and edges, which is a pair RDD. I
have to do a three-way join of these two. Joins work only when both the RDDs
are pair RDDs, so how can we perform a three-way join of these RDDs?

Thank You



--
View this message in context: 
http://apache-spark-user-list.1001560.n3.nabble.com/Joins-in-Spark-tp20819.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

-
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org



Fwd: Joins in Spark

2014-12-22 Thread Deep Pradhan
This gives me two pair RDDs, one is the edgesRDD and another is verticesRDD
with each vertex padded with value null. But I have to take a three way
join of these two RDD and I have only one common attribute in these two
RDDs. How can I go about doing the three join?