Re: Union of 2 streaming data frames

2017-07-10 Thread Michael Armbrust
As I said in the voting thread:

This vote passes! I'll followup with the release on Monday.



On Mon, Jul 10, 2017 at 10:55 AM, Lalwani, Jayesh <
jayesh.lalw...@capitalone.com> wrote:

> Michael,
>
>
>
> I see that 2.2 RC6 has passed a vote on Friday. Does this mean 2.2 is
> going to be out soon? Do you have some sort of ETA?
>
>
>
> *From: *"Lalwani, Jayesh" <jayesh.lalw...@capitalone.com>
> *Date: *Friday, July 7, 2017 at 5:46 PM
> *To: *Michael Armbrust <mich...@databricks.com>
>
> *Cc: *"user@spark.apache.org" <user@spark.apache.org>, #MM - Heartbeat <
> mm-heartb...@capitalone.com>
> *Subject: *Re: Union of 2 streaming data frames
>
>
>
> Great! Even, *val **dfAllEvents =
> sparkSession.table("oldEvents").union(sparkSession.table("newEvents")) 
> *doesn’t
> work. Will this be addressed in 2.2?
>
>
>
>
>
> *From: *Michael Armbrust <mich...@databricks.com>
> *Date: *Friday, July 7, 2017 at 5:42 PM
> *To: *"Lalwani, Jayesh" <jayesh.lalw...@capitalone.com>
> *Cc: *"user@spark.apache.org" <user@spark.apache.org>, #MM - Heartbeat <
> mm-heartb...@capitalone.com>
> *Subject: *Re: Union of 2 streaming data frames
>
>
>
> Ah, looks like you are hitting SPARK-20441
> <https://issues.apache.org/jira/browse/SPARK-20441>.  Should be fixed in
> 2.2.
>
>
>
> On Fri, Jul 7, 2017 at 2:37 PM, Lalwani, Jayesh <
> jayesh.lalw...@capitalone.com> wrote:
>
> I created a small sample code to verify this. It looks like union using
> Spark SQL doesn’t work. Calling union on dataframe works.
> https://gist.github.com/GaalDornick/8920577ca92842f44d7bfd3a277c7545. I’m
> on 2.1.0
>
>
>
> I get the following exception. If I change val dfAllEvents =
> sparkSession.sql("select * from oldEvents union select * from newEvents")
> to val dfAllEvents = dfNewEvents.union(dfOldEvents) it works fine
>
>
>
> 17/07/07 17:33:34 ERROR StreamExecution: Query [id =
> 3bae26a1-7ee3-45ab-a98d-9346eaf03d08, runId = 
> 063af01f-9878-452e-aa30-7c21e2ef4c18]
> terminated with error
>
> org.apache.spark.sql.AnalysisException: resolved attribute(s) acctId#29
> missing from 
> eventType#2,acctId#0,eventId#37L,acctId#36,eventType#38,eventId#1L
> in operator !Join Inner, (acctId#0 = acctId#29);;
>
> Distinct
>
> +- Union
>
>:- Project [acctId#0, eventId#1L, eventType#2]
>
>:  +- SubqueryAlias oldevents, `oldEvents`
>
>: +- Project [acctId#0, eventId#1L, eventType#2]
>
>   :+- !Join Inner, (acctId#0 = acctId#29)
>
>:   :- SubqueryAlias alloldevents, `allOldEvents`
>
>:   :  +- Relation[acctId#0,eventId#1L,eventType#2] json
>
>:   +- SubqueryAlias newevents, `newEvents`
>
>:  +- Relation[acctId#36,eventId#37L,eventType#38] json
>
>+- Project [acctId#29, eventId#30L, eventType#31]
>
>   +- SubqueryAlias newevents, `newEvents`
>
>  +- Relation[acctId#29,eventId#30L,eventType#31] json
>
>
>
> at org.apache.spark.sql.catalyst.
> analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
>
> at org.apache.spark.sql.catalyst.analysis.Analyzer.
> failAnalysis(Analyzer.scala:57)
>
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
> anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:337)
>
> at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$
> anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
>
> at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(
> TreeNode.scala:128)
>
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> foreachUp$1.apply(TreeNode.scala:127)
>
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> foreachUp$1.apply(TreeNode.scala:127)
>
> at scala.collection.immutable.List.foreach(List.scala:381)
>
> at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(
> TreeNode.scala:127)
>
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> foreachUp$1.apply(TreeNode.scala:127)
>
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> foreachUp$1.apply(TreeNode.scala:127)
>
> at scala.collection.immutable.List.foreach(List.scala:381)
>
> at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(
> TreeNode.scala:127)
>
> at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$
> foreachUp$1.apply(TreeNode

Re: Union of 2 streaming data frames

2017-07-10 Thread Lalwani, Jayesh
Michael,

I see that 2.2 RC6 has passed a vote on Friday. Does this mean 2.2 is going to 
be out soon? Do you have some sort of ETA?

From: "Lalwani, Jayesh" <jayesh.lalw...@capitalone.com>
Date: Friday, July 7, 2017 at 5:46 PM
To: Michael Armbrust <mich...@databricks.com>
Cc: "user@spark.apache.org" <user@spark.apache.org>, #MM - Heartbeat 
<mm-heartb...@capitalone.com>
Subject: Re: Union of 2 streaming data frames

Great! Even, val dfAllEvents = 
sparkSession.table("oldEvents").union(sparkSession.table("newEvents")) doesn’t 
work. Will this be addressed in 2.2?


From: Michael Armbrust <mich...@databricks.com>
Date: Friday, July 7, 2017 at 5:42 PM
To: "Lalwani, Jayesh" <jayesh.lalw...@capitalone.com>
Cc: "user@spark.apache.org" <user@spark.apache.org>, #MM - Heartbeat 
<mm-heartb...@capitalone.com>
Subject: Re: Union of 2 streaming data frames

Ah, looks like you are hitting 
SPARK-20441<https://issues.apache.org/jira/browse/SPARK-20441>.  Should be 
fixed in 2.2.

On Fri, Jul 7, 2017 at 2:37 PM, Lalwani, Jayesh 
<jayesh.lalw...@capitalone.com<mailto:jayesh.lalw...@capitalone.com>> wrote:
I created a small sample code to verify this. It looks like union using Spark 
SQL doesn’t work. Calling union on dataframe works. 
https://gist.github.com/GaalDornick/8920577ca92842f44d7bfd3a277c7545. I’m on 
2.1.0

I get the following exception. If I change val dfAllEvents = 
sparkSession.sql("select * from oldEvents union select * from newEvents") to 
val dfAllEvents = dfNewEvents.union(dfOldEvents) it works fine

17/07/07 17:33:34 ERROR StreamExecution: Query [id = 
3bae26a1-7ee3-45ab-a98d-9346eaf03d08, runId = 
063af01f-9878-452e-aa30-7c21e2ef4c18] terminated with error
org.apache.spark.sql.AnalysisException: resolved attribute(s) acctId#29 missing 
from eventType#2,acctId#0,eventId#37L,acctId#36,eventType#38,eventId#1L in 
operator !Join Inner, (acctId#0 = acctId#29);;
Distinct
+- Union
   :- Project [acctId#0, eventId#1L, eventType#2]
   :  +- SubqueryAlias oldevents, `oldEvents`
   : +- Project [acctId#0, eventId#1L, eventType#2]
  :+- !Join Inner, (acctId#0 = acctId#29)
   :   :- SubqueryAlias alloldevents, `allOldEvents`
   :   :  +- Relation[acctId#0,eventId#1L,eventType#2] json
   :   +- SubqueryAlias newevents, `newEvents`
   :  +- Relation[acctId#36,eventId#37L,eventType#38] json
   +- Project [acctId#29, eventId#30L, eventType#31]
  +- SubqueryAlias newevents, `newEvents`
 +- Relation[acctId#29,eventId#30L,eventType#31] json

at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:337)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at 
org.apache.spark.sql.catalyst

Re: Union of 2 streaming data frames

2017-07-07 Thread Lalwani, Jayesh
Great! Even, val dfAllEvents = 
sparkSession.table("oldEvents").union(sparkSession.table("newEvents")) doesn’t 
work. Will this be addressed in 2.2?


From: Michael Armbrust <mich...@databricks.com>
Date: Friday, July 7, 2017 at 5:42 PM
To: "Lalwani, Jayesh" <jayesh.lalw...@capitalone.com>
Cc: "user@spark.apache.org" <user@spark.apache.org>, #MM - Heartbeat 
<mm-heartb...@capitalone.com>
Subject: Re: Union of 2 streaming data frames

Ah, looks like you are hitting 
SPARK-20441<https://issues.apache.org/jira/browse/SPARK-20441>.  Should be 
fixed in 2.2.

On Fri, Jul 7, 2017 at 2:37 PM, Lalwani, Jayesh 
<jayesh.lalw...@capitalone.com<mailto:jayesh.lalw...@capitalone.com>> wrote:
I created a small sample code to verify this. It looks like union using Spark 
SQL doesn’t work. Calling union on dataframe works. 
https://gist.github.com/GaalDornick/8920577ca92842f44d7bfd3a277c7545. I’m on 
2.1.0

I get the following exception. If I change val dfAllEvents = 
sparkSession.sql("select * from oldEvents union select * from newEvents") to 
val dfAllEvents = dfNewEvents.union(dfOldEvents) it works fine

17/07/07 17:33:34 ERROR StreamExecution: Query [id = 
3bae26a1-7ee3-45ab-a98d-9346eaf03d08, runId = 
063af01f-9878-452e-aa30-7c21e2ef4c18] terminated with error
org.apache.spark.sql.AnalysisException: resolved attribute(s) acctId#29 missing 
from eventType#2,acctId#0,eventId#37L,acctId#36,eventType#38,eventId#1L in 
operator !Join Inner, (acctId#0 = acctId#29);;
Distinct
+- Union
   :- Project [acctId#0, eventId#1L, eventType#2]
   :  +- SubqueryAlias oldevents, `oldEvents`
   : +- Project [acctId#0, eventId#1L, eventType#2]
  :+- !Join Inner, (acctId#0 = acctId#29)
   :   :- SubqueryAlias alloldevents, `allOldEvents`
   :   :  +- Relation[acctId#0,eventId#1L,eventType#2] json
   :   +- SubqueryAlias newevents, `newEvents`
   :  +- Relation[acctId#36,eventId#37L,eventType#38] json
   +- Project [acctId#29, eventId#30L, eventType#31]
  +- SubqueryAlias newevents, `newEvents`
 +- Relation[acctId#29,eventId#30L,eventType#31] json

at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.failAnalysis(CheckAnalysis.scala:40)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.failAnalysis(Analyzer.scala:57)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:337)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1.apply(CheckAnalysis.scala:67)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:128)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at 
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:127)
at scala.collection.immutable.List.foreach(List.scala:381)
at 
org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
at 
org.apache.spark.sql.catalyst.analysis.CheckAnalysis$class.checkAnalysis(CheckAnalysis.scala:67)
at 
org.apache.spark.sql.catalyst.analysis.Analyzer.checkAnalysis(Analyzer.scala:57)

Re: Union of 2 streaming data frames

2017-07-07 Thread Michael Armbrust
alysis.Analyzer.
> checkAnalysis(Analyzer.scala:57)
>
> at org.apache.spark.sql.execution.QueryExecution.
> assertAnalyzed(QueryExecution.scala:48)
>
> at org.apache.spark.sql.execution.QueryExecution.
> withCachedData$lzycompute(QueryExecution.scala:68)
>
> at org.apache.spark.sql.execution.QueryExecution.
> withCachedData(QueryExecution.scala:67)
>
> at org.apache.spark.sql.execution.streaming.
> IncrementalExecution.optimizedPlan$lzycompute(
> IncrementalExecution.scala:60)
>
> at org.apache.spark.sql.execution.streaming.
> IncrementalExecution.optimizedPlan(IncrementalExecution.scala:60)
>
> at org.apache.spark.sql.execution.QueryExecution.
> sparkPlan$lzycompute(QueryExecution.scala:79)
>
> at org.apache.spark.sql.execution.QueryExecution.
> sparkPlan(QueryExecution.scala:75)
>
> at org.apache.spark.sql.execution.QueryExecution.
> executedPlan$lzycompute(QueryExecution.scala:84)
>
> at org.apache.spark.sql.execution.QueryExecution.
> executedPlan(QueryExecution.scala:84)
>
> at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anonfun$org$apache$spark$sql$execution$
> streaming$StreamExecution$$runBatch$3.apply(StreamExecution.scala:496)
>
> at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anonfun$org$apache$spark$sql$execution$
> streaming$StreamExecution$$runBatch$3.apply(StreamExecution.scala:488)
>
> at org.apache.spark.sql.execution.streaming.
> ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
>
> at org.apache.spark.sql.execution.streaming.
> StreamExecution.reportTimeTaken(StreamExecution.scala:46)
>
> at org.apache.spark.sql.execution.streaming.
> StreamExecution.org$apache$spark$sql$execution$streaming$
> StreamExecution$$runBatch(StreamExecution.scala:488)
>
> at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anonfun$org$apache$spark$sql$execution$
> streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$
> mcV$sp(StreamExecution.scala:255)
>
> at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anonfun$org$apache$spark$sql$execution$
> streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(
> StreamExecution.scala:244)
>
> at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anonfun$org$apache$spark$sql$execution$
> streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(
> StreamExecution.scala:244)
>
> at org.apache.spark.sql.execution.streaming.
> ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
>
> at org.apache.spark.sql.execution.streaming.
> StreamExecution.reportTimeTaken(StreamExecution.scala:46)
>
> at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anonfun$org$apache$spark$sql$execution$
> streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(
> StreamExecution.scala:244)
>
> at org.apache.spark.sql.execution.streaming.
> ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
>
> at org.apache.spark.sql.execution.streaming.
> StreamExecution.org$apache$spark$sql$execution$streaming$
> StreamExecution$$runBatches(StreamExecution.scala:239)
>
> at org.apache.spark.sql.execution.streaming.
> StreamExecution$$anon$1.run(StreamExecution.scala:177)
>
>
>
>
>
>
>
>
>
> *From: *Michael Armbrust <mich...@databricks.com>
> *Date: *Friday, July 7, 2017 at 2:30 PM
> *To: *"Lalwani, Jayesh" <jayesh.lalw...@capitalone.com>
> *Cc: *"user@spark.apache.org" <user@spark.apache.org>
> *Subject: *Re: Union of 2 streaming data frames
>
>
>
> df.union(df2) should be supported when both DataFrames are created from a
> streaming source.  What error are you seeing?
>
>
>
> On Fri, Jul 7, 2017 at 11:27 AM, Lalwani, Jayesh <
> jayesh.lalw...@capitalone.com> wrote:
>
> In structured streaming, Is there a way to Union 2 streaming data frames?
> Are there any plans to support Union of 2 streaming dataframes soon? I can
> understand the inherent complexity in joining 2 streaming data frames. But,
> Union is  just concatenating 2 microbatches, innit?
>
>
>
> The problem that we are trying to solve is that we have a Kafka stream
> that is receiving events. Each event is assosciated with an account ID. We
> have a data store that stores historical  events for hundreds of millions
> of accounts. What we want to do is for the events coming in the input
> stre

Re: Union of 2 streaming data frames

2017-07-07 Thread Lalwani, Jayesh
n.scala:75)
at 
org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:84)
at 
org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:84)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$3.apply(StreamExecution.scala:496)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch$3.apply(StreamExecution.scala:488)
at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatch(StreamExecution.scala:488)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply$mcV$sp(StreamExecution.scala:255)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1$$anonfun$1.apply(StreamExecution.scala:244)
at 
org.apache.spark.sql.execution.streaming.ProgressReporter$class.reportTimeTaken(ProgressReporter.scala:262)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.reportTimeTaken(StreamExecution.scala:46)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anonfun$org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches$1.apply$mcZ$sp(StreamExecution.scala:244)
at 
org.apache.spark.sql.execution.streaming.ProcessingTimeExecutor.execute(TriggerExecutor.scala:43)
at 
org.apache.spark.sql.execution.streaming.StreamExecution.org$apache$spark$sql$execution$streaming$StreamExecution$$runBatches(StreamExecution.scala:239)
at 
org.apache.spark.sql.execution.streaming.StreamExecution$$anon$1.run(StreamExecution.scala:177)




From: Michael Armbrust <mich...@databricks.com>
Date: Friday, July 7, 2017 at 2:30 PM
To: "Lalwani, Jayesh" <jayesh.lalw...@capitalone.com>
Cc: "user@spark.apache.org" <user@spark.apache.org>
Subject: Re: Union of 2 streaming data frames

df.union(df2) should be supported when both DataFrames are created from a 
streaming source.  What error are you seeing?

On Fri, Jul 7, 2017 at 11:27 AM, Lalwani, Jayesh 
<jayesh.lalw...@capitalone.com<mailto:jayesh.lalw...@capitalone.com>> wrote:
In structured streaming, Is there a way to Union 2 streaming data frames? Are 
there any plans to support Union of 2 streaming dataframes soon? I can 
understand the inherent complexity in joining 2 streaming data frames. But, 
Union is  just concatenating 2 microbatches, innit?

The problem that we are trying to solve is that we have a Kafka stream that is 
receiving events. Each event is assosciated with an account ID. We have a data 
store that stores historical  events for hundreds of millions of accounts. What 
we want to do is for the events coming in the input stream, we want to add in 
all the historical events from the data store and give it to a model.

Initially, the way we were planning to do this is
a) read from Kafka into a streaming dataframe. Call this inputDF.
b) In a mapWithPartition method, get all the unique accounts in the partition. 
Look up all the historical events for those unique accounts and return them. 
Let’s call this historicalDF
c) Union inputDF with historicalDF. Call this allDF
d) Call mapWithPartition on allDF and give the records to the model

Of course, this doesn’t work because both inputDF and historicalDF are 
streaming data frames.

What we ended up doing is in step b) we output the input records with the 
historical records, which works but seems like a hacky way of doing things. The 
operation that does lookup does union too. This works for now because the data 
from the data store doesn’t require any transformation or aggregation. But, if 
it did, we would like to do that using Spark SQL, whereas this solution forces 
us to doing any transformation of historical data in Scala

Is there a Sparky way of doing this?



The information contained in this e-mail is confidential and/or proprietary to 
Capital One and/or its affiliates and may only be used solely in performance of 
work or services for Capital One. The information transmitted herewith is 
intended only for use by the i

Re: Union of 2 streaming data frames

2017-07-07 Thread Michael Armbrust
df.union(df2) should be supported when both DataFrames are created from a
streaming source.  What error are you seeing?

On Fri, Jul 7, 2017 at 11:27 AM, Lalwani, Jayesh <
jayesh.lalw...@capitalone.com> wrote:

> In structured streaming, Is there a way to Union 2 streaming data frames?
> Are there any plans to support Union of 2 streaming dataframes soon? I can
> understand the inherent complexity in joining 2 streaming data frames. But,
> Union is  just concatenating 2 microbatches, innit?
>
>
>
> The problem that we are trying to solve is that we have a Kafka stream
> that is receiving events. Each event is assosciated with an account ID. We
> have a data store that stores historical  events for hundreds of millions
> of accounts. What we want to do is for the events coming in the input
> stream, we want to add in all the historical events from the data store and
> give it to a model.
>
>
>
> Initially, the way we were planning to do this is
> a) read from Kafka into a streaming dataframe. Call this inputDF.
> b) In a mapWithPartition method, get all the unique accounts in the
> partition. Look up all the historical events for those unique accounts and
> return them. Let’s call this historicalDF
>
> c) Union inputDF with historicalDF. Call this allDF
>
> d) Call mapWithPartition on allDF and give the records to the model
>
>
>
> Of course, this doesn’t work because both inputDF and historicalDF are
> streaming data frames.
>
>
>
> What we ended up doing is in step b) we output the input records with the
> historical records, which works but seems like a hacky way of doing things.
> The operation that does lookup does union too. This works for now because
> the data from the data store doesn’t require any transformation or
> aggregation. But, if it did, we would like to do that using Spark SQL,
> whereas this solution forces us to doing any transformation of historical
> data in Scala
>
>
>
> Is there a Sparky way of doing this?
>
>
> --
>
> The information contained in this e-mail is confidential and/or
> proprietary to Capital One and/or its affiliates and may only be used
> solely in performance of work or services for Capital One. The information
> transmitted herewith is intended only for use by the individual or entity
> to which it is addressed. If the reader of this message is not the intended
> recipient, you are hereby notified that any review, retransmission,
> dissemination, distribution, copying or other use of, or taking of any
> action in reliance upon this information is strictly prohibited. If you
> have received this communication in error, please contact the sender and
> delete the material from your computer.
>