Re: [Structured Streaming] More than 1 streaming in a code

2018-04-16 Thread Aakash Basu
Hi Gerard,

"If your actual source is Kafka, the original solution of using
`spark.streams.awaitAnyTermination`  should solve the problem."

I tried literally everything, nothing worked out.

1) Tried NC from two different ports for two diff streams, still nothing
worked.

2) Tried same using Kafka with awaitAnyTermination, still no use, the first
stream write kept on blocking the second... (And inner queries with
aggregation doesn't work in Spark Streaming it seems, as it expects a
separate writeStream.start()).

Any insight (or direct update to the code would be helpful).

Thanks,
Aakash.

On Mon 16 Apr, 2018, 9:05 PM Gerard Maas, <gerard.m...@gmail.com> wrote:

> Aakash,
>
> There are two issues here.
> The issue with the code on the first question is that the first query
> blocks and the code for the second does not get executed. Panagiotis
> pointed this out correctly.
> In the updated code, the issue is related to netcat (nc) and the way
> structured streaming works. As far as I remember, netcat only delivers data
> to the first network connection.
> On the structured streaming side, each query will issue its own
> connection. This results in only the first query getting the data.
> If you would talk to a TPC server supporting multiple connected clients,
> you would see data in both queries.
>
> If your actual source is Kafka, the original solution of using
> `spark.streams.awaitAnyTermination`  should solve the problem.
>
> -kr, Gerard.
>
>
>
> On Mon, Apr 16, 2018 at 10:52 AM, Aakash Basu <aakash.spark@gmail.com>
> wrote:
>
>> Hey Jayesh and Others,
>>
>> Is there then, any other way to come to a solution for this use-case?
>>
>> Thanks,
>> Aakash.
>>
>> On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <
>> jayesh.lalw...@capitalone.com> wrote:
>>
>>> Note that what you are trying to do here is join a streaming data frame
>>> with an aggregated streaming data frame. As per the documentation, joining
>>> an aggregated streaming data frame with another streaming data frame is not
>>> supported
>>>
>>>
>>>
>>>
>>>
>>> *From: *spark receiver <spark.recei...@gmail.com>
>>> *Date: *Friday, April 13, 2018 at 11:49 PM
>>> *To: *Aakash Basu <aakash.spark@gmail.com>
>>> *Cc: *Panagiotis Garefalakis <panga...@gmail.com>, user <
>>> user@spark.apache.org>
>>> *Subject: *Re: [Structured Streaming] More than 1 streaming in a code
>>>
>>>
>>>
>>> Hi Panagiotis ,
>>>
>>>
>>>
>>> Wondering you solved the problem or not? Coz I met the same issue today.
>>> I’d appreciate  so much if you could paste the code snippet  if it’s
>>> working .
>>>
>>>
>>>
>>> Thanks.
>>>
>>>
>>>
>>>
>>>
>>> 在 2018年4月6日,上午7:40,Aakash Basu <aakash.spark@gmail.com> 写道:
>>>
>>>
>>>
>>> Hi Panagiotis,
>>>
>>> I did that, but it still prints the result of the first query and awaits
>>> for new data, doesn't even goes to the next one.
>>>
>>> *Data -*
>>>
>>> $ nc -lk 9998
>>>
>>> 1,2
>>> 3,4
>>> 5,6
>>> 7,8
>>>
>>> *Result -*
>>>
>>> ---
>>> Batch: 0
>>> ---
>>> ++
>>> |aver|
>>> ++
>>> | 3.0|
>>> ++
>>>
>>> ---
>>> Batch: 1
>>> ---
>>> ++
>>> |aver|
>>> ++
>>> | 4.0|
>>> ++
>>>
>>>
>>>
>>> *Updated Code -*
>>>
>>> from pyspark.sql import SparkSession
>>> from pyspark.sql.functions import split
>>>
>>> spark = SparkSession \
>>> .builder \
>>> .appName("StructuredNetworkWordCount") \
>>> .getOrCreate()
>>>
>>> data = spark \
>>> .readStream \
>>> .format("socket") \
>>> .option("header","true") \
>>> .option("host", "localhost") \
>>> .option("port", 9998) \
>>> .load("csv")
>>>
>>>
>>> id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
>>> split(data.value, ","

Re: [Structured Streaming] More than 1 streaming in a code

2018-04-16 Thread Gerard Maas
Aakash,

There are two issues here.
The issue with the code on the first question is that the first query
blocks and the code for the second does not get executed. Panagiotis
pointed this out correctly.
In the updated code, the issue is related to netcat (nc) and the way
structured streaming works. As far as I remember, netcat only delivers data
to the first network connection.
On the structured streaming side, each query will issue its own connection.
This results in only the first query getting the data.
If you would talk to a TPC server supporting multiple connected clients,
you would see data in both queries.

If your actual source is Kafka, the original solution of using
`spark.streams.awaitAnyTermination`  should solve the problem.

-kr, Gerard.



On Mon, Apr 16, 2018 at 10:52 AM, Aakash Basu <aakash.spark@gmail.com>
wrote:

> Hey Jayesh and Others,
>
> Is there then, any other way to come to a solution for this use-case?
>
> Thanks,
> Aakash.
>
> On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <
> jayesh.lalw...@capitalone.com> wrote:
>
>> Note that what you are trying to do here is join a streaming data frame
>> with an aggregated streaming data frame. As per the documentation, joining
>> an aggregated streaming data frame with another streaming data frame is not
>> supported
>>
>>
>>
>>
>>
>> *From: *spark receiver <spark.recei...@gmail.com>
>> *Date: *Friday, April 13, 2018 at 11:49 PM
>> *To: *Aakash Basu <aakash.spark@gmail.com>
>> *Cc: *Panagiotis Garefalakis <panga...@gmail.com>, user <
>> user@spark.apache.org>
>> *Subject: *Re: [Structured Streaming] More than 1 streaming in a code
>>
>>
>>
>> Hi Panagiotis ,
>>
>>
>>
>> Wondering you solved the problem or not? Coz I met the same issue today.
>> I’d appreciate  so much if you could paste the code snippet  if it’s
>> working .
>>
>>
>>
>> Thanks.
>>
>>
>>
>>
>>
>> 在 2018年4月6日,上午7:40,Aakash Basu <aakash.spark@gmail.com> 写道:
>>
>>
>>
>> Hi Panagiotis,
>>
>> I did that, but it still prints the result of the first query and awaits
>> for new data, doesn't even goes to the next one.
>>
>> *Data -*
>>
>> $ nc -lk 9998
>>
>> 1,2
>> 3,4
>> 5,6
>> 7,8
>>
>> *Result -*
>>
>> ---
>> Batch: 0
>> ---
>> ++
>> |aver|
>> ++
>> | 3.0|
>> ++
>>
>> ---
>> Batch: 1
>> ---
>> ++
>> |aver|
>> ++
>> | 4.0|
>> ++
>>
>>
>>
>> *Updated Code -*
>>
>> from pyspark.sql import SparkSession
>> from pyspark.sql.functions import split
>>
>> spark = SparkSession \
>> .builder \
>> .appName("StructuredNetworkWordCount") \
>> .getOrCreate()
>>
>> data = spark \
>> .readStream \
>> .format("socket") \
>> .option("header","true") \
>> .option("host", "localhost") \
>> .option("port", 9998) \
>> .load("csv")
>>
>>
>> id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
>> split(data.value, ",").getItem(1).alias("col2"))
>>
>> id_DF.createOrReplaceTempView("ds")
>>
>> df = spark.sql("select avg(col1) as aver from ds")
>>
>> df.createOrReplaceTempView("abcd")
>>
>> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 
>> from ds")  # (select aver from abcd)
>>
>> query2 = df \
>> .writeStream \
>> .format("console") \
>> .outputMode("complete") \
>> .trigger(processingTime='5 seconds') \
>> .start()
>>
>> query = wordCounts \
>> .writeStream \
>> .format("console") \
>> .trigger(processingTime='5 seconds') \
>> .start()
>>
>> spark.streams.awaitAnyTermination()
>>
>>
>>
>> Thanks,
>>
>> Aakash.
>>
>>
>>
>> On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <
>> panga...@gmail.com> wrote:
>>
>> Hello Aakash,
>>
>>
>>
>> When you use query.awaitTermination you are pretty much blocking there
>> waitin

Re: [Structured Streaming] More than 1 streaming in a code

2018-04-16 Thread Lalwani, Jayesh
You could have a really large window.

From: Aakash Basu <aakash.spark@gmail.com>
Date: Monday, April 16, 2018 at 10:56 AM
To: "Lalwani, Jayesh" <jayesh.lalw...@capitalone.com>
Cc: spark receiver <spark.recei...@gmail.com>, Panagiotis Garefalakis 
<panga...@gmail.com>, user <user@spark.apache.org>
Subject: Re: [Structured Streaming] More than 1 streaming in a code

If I use timestamp based windowing, then my average will not be global average 
but grouped by timestamp, which is not my requirement. I want to recalculate 
the avg of entire column, every time a new row(s) comes in and divide the other 
column with the updated avg.
Let me know, in-case you or anyone else has any soln. for this.

On Mon, Apr 16, 2018 at 7:52 PM, Lalwani, Jayesh 
<jayesh.lalw...@capitalone.com<mailto:jayesh.lalw...@capitalone.com>> wrote:
You could do it if you had a timestamp in your data.  You can use windowed 
operations to divide a value by it’s own average over a window. However, in 
structured streaming, you can only window by timestamp columns. You cannot do 
windows aggregations on integers.

From: Aakash Basu 
<aakash.spark@gmail.com<mailto:aakash.spark@gmail.com>>
Date: Monday, April 16, 2018 at 4:52 AM
To: "Lalwani, Jayesh" 
<jayesh.lalw...@capitalone.com<mailto:jayesh.lalw...@capitalone.com>>
Cc: spark receiver <spark.recei...@gmail.com<mailto:spark.recei...@gmail.com>>, 
Panagiotis Garefalakis <panga...@gmail.com<mailto:panga...@gmail.com>>, user 
<user@spark.apache.org<mailto:user@spark.apache.org>>

Subject: Re: [Structured Streaming] More than 1 streaming in a code

Hey Jayesh and Others,
Is there then, any other way to come to a solution for this use-case?

Thanks,
Aakash.

On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh 
<jayesh.lalw...@capitalone.com<mailto:jayesh.lalw...@capitalone.com>> wrote:
Note that what you are trying to do here is join a streaming data frame with an 
aggregated streaming data frame. As per the documentation, joining an 
aggregated streaming data frame with another streaming data frame is not 
supported


From: spark receiver <spark.recei...@gmail.com<mailto:spark.recei...@gmail.com>>
Date: Friday, April 13, 2018 at 11:49 PM
To: Aakash Basu <aakash.spark@gmail.com<mailto:aakash.spark@gmail.com>>
Cc: Panagiotis Garefalakis <panga...@gmail.com<mailto:panga...@gmail.com>>, 
user <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: [Structured Streaming] More than 1 streaming in a code

Hi Panagiotis ,

Wondering you solved the problem or not? Coz I met the same issue today. I’d 
appreciate  so much if you could paste the code snippet  if it’s working .

Thanks.


在 2018年4月6日,上午7:40,Aakash Basu 
<aakash.spark@gmail.com<mailto:aakash.spark@gmail.com>> 写道:

Hi Panagiotis,
I did that, but it still prints the result of the first query and awaits for 
new data, doesn't even goes to the next one.
Data -

$ nc -lk 9998

1,2
3,4
5,6
7,8
Result -

---
Batch: 0
---
++
|aver|
++
| 3.0|
++

---
Batch: 1
---
++
|aver|
++
| 4.0|
++


Updated Code -

from pyspark.sql import SparkSession
from pyspark.sql.functions import split

spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()

data = spark \
.readStream \
.format("socket") \
.option("header","true") \
.option("host", "localhost") \
.option("port", 9998) \
.load("csv")


id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
split(data.value, ",").getItem(1).alias("col2"))

id_DF.createOrReplaceTempView("ds")

df = spark.sql("select avg(col1) as aver from ds")

df.createOrReplaceTempView("abcd")

wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 
from ds")  # (select aver from abcd)

query2 = df \
.writeStream \
.format("console") \
.outputMode("complete") \
.trigger(processingTime='5 seconds') \
.start()

query = wordCounts \
.writeStream \
.format("console") \
.trigger(processingTime='5 seconds') \
.start()

spark.streams.awaitAnyTermination()

Thanks,
Aakash.

On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis 
<panga...@gmail.com<mailto:panga...@gmail.com>> wrote:
Hello Aakash,

When you use query.awaitTermination you are pretty much blocking there waiting 
for the current query to stop or throw an exception. In your case the second 
query will not even start.
What you could do 

Re: [Structured Streaming] More than 1 streaming in a code

2018-04-16 Thread Aakash Basu
If I use timestamp based windowing, then my average will not be global
average but grouped by timestamp, which is not my requirement. I want to
recalculate the avg of entire column, every time a new row(s) comes in and
divide the other column with the updated avg.

Let me know, in-case you or anyone else has any soln. for this.

On Mon, Apr 16, 2018 at 7:52 PM, Lalwani, Jayesh <
jayesh.lalw...@capitalone.com> wrote:

> You could do it if you had a timestamp in your data.  You can use windowed
> operations to divide a value by it’s own average over a window. However, in
> structured streaming, you can only window by timestamp columns. You cannot
> do windows aggregations on integers.
>
>
>
> *From: *Aakash Basu <aakash.spark@gmail.com>
> *Date: *Monday, April 16, 2018 at 4:52 AM
> *To: *"Lalwani, Jayesh" <jayesh.lalw...@capitalone.com>
> *Cc: *spark receiver <spark.recei...@gmail.com>, Panagiotis Garefalakis <
> panga...@gmail.com>, user <user@spark.apache.org>
>
> *Subject: *Re: [Structured Streaming] More than 1 streaming in a code
>
>
>
> Hey Jayesh and Others,
>
> Is there then, any other way to come to a solution for this use-case?
>
>
>
> Thanks,
>
> Aakash.
>
>
>
> On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <
> jayesh.lalw...@capitalone.com> wrote:
>
> Note that what you are trying to do here is join a streaming data frame
> with an aggregated streaming data frame. As per the documentation, joining
> an aggregated streaming data frame with another streaming data frame is not
> supported
>
>
>
>
>
> *From: *spark receiver <spark.recei...@gmail.com>
> *Date: *Friday, April 13, 2018 at 11:49 PM
> *To: *Aakash Basu <aakash.spark@gmail.com>
> *Cc: *Panagiotis Garefalakis <panga...@gmail.com>, user <
> user@spark.apache.org>
> *Subject: *Re: [Structured Streaming] More than 1 streaming in a code
>
>
>
> Hi Panagiotis ,
>
>
>
> Wondering you solved the problem or not? Coz I met the same issue today.
> I’d appreciate  so much if you could paste the code snippet  if it’s
> working .
>
>
>
> Thanks.
>
>
>
>
>
> 在 2018年4月6日,上午7:40,Aakash Basu <aakash.spark@gmail.com> 写道:
>
>
>
> Hi Panagiotis,
>
> I did that, but it still prints the result of the first query and awaits
> for new data, doesn't even goes to the next one.
>
> *Data -*
>
> $ nc -lk 9998
>
> 1,2
> 3,4
> 5,6
> 7,8
>
> *Result -*
>
> ---
> Batch: 0
> ---
> ++
> |aver|
> ++
> | 3.0|
> ++
>
> ---
> Batch: 1
> ---
> ++
> |aver|
> ++
> | 4.0|
> ++
>
>
>
> *Updated Code -*
>
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import split
>
> spark = SparkSession \
> .builder \
> .appName("StructuredNetworkWordCount") \
> .getOrCreate()
>
> data = spark \
> .readStream \
> .format("socket") \
> .option("header","true") \
> .option("host", "localhost") \
> .option("port", 9998) \
> .load("csv")
>
>
> id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
> split(data.value, ",").getItem(1).alias("col2"))
>
> id_DF.createOrReplaceTempView("ds")
>
> df = spark.sql("select avg(col1) as aver from ds")
>
> df.createOrReplaceTempView("abcd")
>
> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 
> from ds")  # (select aver from abcd)
>
> query2 = df \
> .writeStream \
> .format("console") \
> .outputMode("complete") \
> .trigger(processingTime='5 seconds') \
> .start()
>
> query = wordCounts \
> .writeStream \
> .format("console") \
> .trigger(processingTime='5 seconds') \
> .start()
>
> spark.streams.awaitAnyTermination()
>
>
>
> Thanks,
>
> Aakash.
>
>
>
> On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <panga...@gmail.com>
> wrote:
>
> Hello Aakash,
>
>
>
> When you use query.awaitTermination you are pretty much blocking there
> waiting for the current query to stop or throw an exception. In your case
> the second query will not even start.
>
> What you could do instead is remove all the blocking calls and use
> spark.streams.awaitAnyTerminat

Re: [Structured Streaming] More than 1 streaming in a code

2018-04-16 Thread Lalwani, Jayesh
You could do it if you had a timestamp in your data.  You can use windowed 
operations to divide a value by it’s own average over a window. However, in 
structured streaming, you can only window by timestamp columns. You cannot do 
windows aggregations on integers.

From: Aakash Basu <aakash.spark@gmail.com>
Date: Monday, April 16, 2018 at 4:52 AM
To: "Lalwani, Jayesh" <jayesh.lalw...@capitalone.com>
Cc: spark receiver <spark.recei...@gmail.com>, Panagiotis Garefalakis 
<panga...@gmail.com>, user <user@spark.apache.org>
Subject: Re: [Structured Streaming] More than 1 streaming in a code

Hey Jayesh and Others,
Is there then, any other way to come to a solution for this use-case?

Thanks,
Aakash.

On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh 
<jayesh.lalw...@capitalone.com<mailto:jayesh.lalw...@capitalone.com>> wrote:
Note that what you are trying to do here is join a streaming data frame with an 
aggregated streaming data frame. As per the documentation, joining an 
aggregated streaming data frame with another streaming data frame is not 
supported


From: spark receiver <spark.recei...@gmail.com<mailto:spark.recei...@gmail.com>>
Date: Friday, April 13, 2018 at 11:49 PM
To: Aakash Basu <aakash.spark@gmail.com<mailto:aakash.spark@gmail.com>>
Cc: Panagiotis Garefalakis <panga...@gmail.com<mailto:panga...@gmail.com>>, 
user <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re: [Structured Streaming] More than 1 streaming in a code

Hi Panagiotis ,

Wondering you solved the problem or not? Coz I met the same issue today. I’d 
appreciate  so much if you could paste the code snippet  if it’s working .

Thanks.


在 2018年4月6日,上午7:40,Aakash Basu 
<aakash.spark@gmail.com<mailto:aakash.spark@gmail.com>> 写道:

Hi Panagiotis,
I did that, but it still prints the result of the first query and awaits for 
new data, doesn't even goes to the next one.
Data -

$ nc -lk 9998

1,2
3,4
5,6
7,8
Result -

---
Batch: 0
---
++
|aver|
++
| 3.0|
++

---
Batch: 1
---
++
|aver|
++
| 4.0|
++


Updated Code -

from pyspark.sql import SparkSession
from pyspark.sql.functions import split

spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()

data = spark \
.readStream \
.format("socket") \
.option("header","true") \
.option("host", "localhost") \
.option("port", 9998) \
.load("csv")


id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
split(data.value, ",").getItem(1).alias("col2"))

id_DF.createOrReplaceTempView("ds")

df = spark.sql("select avg(col1) as aver from ds")

df.createOrReplaceTempView("abcd")

wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 
from ds")  # (select aver from abcd)

query2 = df \
.writeStream \
.format("console") \
.outputMode("complete") \
.trigger(processingTime='5 seconds') \
.start()

query = wordCounts \
.writeStream \
.format("console") \
.trigger(processingTime='5 seconds') \
.start()

spark.streams.awaitAnyTermination()

Thanks,
Aakash.

On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis 
<panga...@gmail.com<mailto:panga...@gmail.com>> wrote:
Hello Aakash,

When you use query.awaitTermination you are pretty much blocking there waiting 
for the current query to stop or throw an exception. In your case the second 
query will not even start.
What you could do instead is remove all the blocking calls and use 
spark.streams.awaitAnyTermination instead (waiting for either query1 or query2 
to terminate). Make sure you do that after the query2.start call.

I hope this helps.

Cheers,
Panagiotis

On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu 
<aakash.spark....@gmail.com<mailto:aakash.spark@gmail.com>> wrote:
Any help?
Need urgent help. Someone please clarify the doubt?

-- Forwarded message --
From: Aakash Basu 
<aakash.spark@gmail.com<mailto:aakash.spark@gmail.com>>
Date: Thu, Apr 5, 2018 at 3:18 PM
Subject: [Structured Streaming] More than 1 streaming in a code
To: user <user@spark.apache.org<mailto:user@spark.apache.org>>
Hi,
If I have more than one writeStream in a code, which operates on the same 
readStream data, why does it produce only the first writeStream? I want the 
second one to be also printed on the console.
How to do that?


from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

class test:


spark = SparkSession

Re: [Structured Streaming] More than 1 streaming in a code

2018-04-16 Thread Aakash Basu
Hey Jayesh and Others,

Is there then, any other way to come to a solution for this use-case?

Thanks,
Aakash.

On Mon, Apr 16, 2018 at 8:11 AM, Lalwani, Jayesh <
jayesh.lalw...@capitalone.com> wrote:

> Note that what you are trying to do here is join a streaming data frame
> with an aggregated streaming data frame. As per the documentation, joining
> an aggregated streaming data frame with another streaming data frame is not
> supported
>
>
>
>
>
> *From: *spark receiver <spark.recei...@gmail.com>
> *Date: *Friday, April 13, 2018 at 11:49 PM
> *To: *Aakash Basu <aakash.spark@gmail.com>
> *Cc: *Panagiotis Garefalakis <panga...@gmail.com>, user <
> user@spark.apache.org>
> *Subject: *Re: [Structured Streaming] More than 1 streaming in a code
>
>
>
> Hi Panagiotis ,
>
>
>
> Wondering you solved the problem or not? Coz I met the same issue today.
> I’d appreciate  so much if you could paste the code snippet  if it’s
> working .
>
>
>
> Thanks.
>
>
>
>
>
> 在 2018年4月6日,上午7:40,Aakash Basu <aakash.spark@gmail.com> 写道:
>
>
>
> Hi Panagiotis,
>
> I did that, but it still prints the result of the first query and awaits
> for new data, doesn't even goes to the next one.
>
> *Data -*
>
> $ nc -lk 9998
>
> 1,2
> 3,4
> 5,6
> 7,8
>
> *Result -*
>
> ---
> Batch: 0
> ---
> ++
> |aver|
> ++
> | 3.0|
> ++
>
> ---
> Batch: 1
> ---
> ++
> |aver|
> ++
> | 4.0|
> ++
>
>
>
> *Updated Code -*
>
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import split
>
> spark = SparkSession \
> .builder \
> .appName("StructuredNetworkWordCount") \
> .getOrCreate()
>
> data = spark \
> .readStream \
> .format("socket") \
> .option("header","true") \
> .option("host", "localhost") \
> .option("port", 9998) \
> .load("csv")
>
>
> id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
> split(data.value, ",").getItem(1).alias("col2"))
>
> id_DF.createOrReplaceTempView("ds")
>
> df = spark.sql("select avg(col1) as aver from ds")
>
> df.createOrReplaceTempView("abcd")
>
> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 
> from ds")  # (select aver from abcd)
>
> query2 = df \
> .writeStream \
> .format("console") \
> .outputMode("complete") \
> .trigger(processingTime='5 seconds') \
> .start()
>
> query = wordCounts \
> .writeStream \
> .format("console") \
> .trigger(processingTime='5 seconds') \
> .start()
>
> spark.streams.awaitAnyTermination()
>
>
>
> Thanks,
>
> Aakash.
>
>
>
> On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <panga...@gmail.com>
> wrote:
>
> Hello Aakash,
>
>
>
> When you use query.awaitTermination you are pretty much blocking there
> waiting for the current query to stop or throw an exception. In your case
> the second query will not even start.
>
> What you could do instead is remove all the blocking calls and use
> spark.streams.awaitAnyTermination instead (waiting for either query1 or
> query2 to terminate). Make sure you do that after the query2.start call.
>
>
>
> I hope this helps.
>
>
>
> Cheers,
>
> Panagiotis
>
>
>
> On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <aakash.spark@gmail.com>
> wrote:
>
> Any help?
>
> Need urgent help. Someone please clarify the doubt?
>
>
>
> -- Forwarded message --
> From: *Aakash Basu* <aakash.spark@gmail.com>
> Date: Thu, Apr 5, 2018 at 3:18 PM
> Subject: [Structured Streaming] More than 1 streaming in a code
> To: user <user@spark.apache.org>
>
> Hi,
>
> If I have more than one writeStream in a code, which operates on the same
> readStream data, why does it produce only the first writeStream? I want the
> second one to be also printed on the console.
>
> How to do that?
>
>
>
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import split, col
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("Stream_Col_Oper_Spark") \
> .getOrCreate()
>
> 

Re: [Structured Streaming] More than 1 streaming in a code

2018-04-15 Thread Lalwani, Jayesh
Note that what you are trying to do here is join a streaming data frame with an 
aggregated streaming data frame. As per the documentation, joining an 
aggregated streaming data frame with another streaming data frame is not 
supported


From: spark receiver <spark.recei...@gmail.com>
Date: Friday, April 13, 2018 at 11:49 PM
To: Aakash Basu <aakash.spark@gmail.com>
Cc: Panagiotis Garefalakis <panga...@gmail.com>, user <user@spark.apache.org>
Subject: Re: [Structured Streaming] More than 1 streaming in a code

Hi Panagiotis ,

Wondering you solved the problem or not? Coz I met the same issue today. I’d 
appreciate  so much if you could paste the code snippet  if it’s working .

Thanks.



在 2018年4月6日,上午7:40,Aakash Basu 
<aakash.spark@gmail.com<mailto:aakash.spark@gmail.com>> 写道:

Hi Panagiotis,
I did that, but it still prints the result of the first query and awaits for 
new data, doesn't even goes to the next one.
Data -

$ nc -lk 9998

1,2
3,4
5,6
7,8
Result -

---
Batch: 0
---
++
|aver|
++
| 3.0|
++

---
Batch: 1
---
++
|aver|
++
| 4.0|
++


Updated Code -

from pyspark.sql import SparkSession
from pyspark.sql.functions import split

spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()

data = spark \
.readStream \
.format("socket") \
.option("header","true") \
.option("host", "localhost") \
.option("port", 9998) \
.load("csv")


id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
split(data.value, ",").getItem(1).alias("col2"))

id_DF.createOrReplaceTempView("ds")

df = spark.sql("select avg(col1) as aver from ds")

df.createOrReplaceTempView("abcd")

wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 
from ds")  # (select aver from abcd)

query2 = df \
.writeStream \
.format("console") \
.outputMode("complete") \
.trigger(processingTime='5 seconds') \
.start()

query = wordCounts \
.writeStream \
.format("console") \
.trigger(processingTime='5 seconds') \
.start()

spark.streams.awaitAnyTermination()

Thanks,
Aakash.

On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis 
<panga...@gmail.com<mailto:panga...@gmail.com>> wrote:
Hello Aakash,

When you use query.awaitTermination you are pretty much blocking there waiting 
for the current query to stop or throw an exception. In your case the second 
query will not even start.
What you could do instead is remove all the blocking calls and use 
spark.streams.awaitAnyTermination instead (waiting for either query1 or query2 
to terminate). Make sure you do that after the query2.start call.

I hope this helps.

Cheers,
Panagiotis

On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu 
<aakash.spark@gmail.com<mailto:aakash.spark@gmail.com>> wrote:
Any help?
Need urgent help. Someone please clarify the doubt?

-- Forwarded message --
From: Aakash Basu 
<aakash.spark@gmail.com<mailto:aakash.spark@gmail.com>>
Date: Thu, Apr 5, 2018 at 3:18 PM
Subject: [Structured Streaming] More than 1 streaming in a code
To: user <user@spark.apache.org<mailto:user@spark.apache.org>>

Hi,
If I have more than one writeStream in a code, which operates on the same 
readStream data, why does it produce only the first writeStream? I want the 
second one to be also printed on the console.
How to do that?


from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

class test:


spark = SparkSession.builder \
.appName("Stream_Col_Oper_Spark") \
.getOrCreate()

data = spark.readStream.format("kafka") \
.option("startingOffsets", "latest") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test1") \
.load()

ID = data.select('value') \
.withColumn('value', data.value.cast("string")) \
.withColumn("Col1", split(col("value"), ",").getItem(0)) \
.withColumn("Col2", split(col("value"), ",").getItem(1)) \
.drop('value')

ID.createOrReplaceTempView("transformed_Stream_DF")

df = spark.sql("select avg(col1) as aver from transformed_Stream_DF")

df.createOrReplaceTempView("abcd")

wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) 
col3 from transformed_Stream_DF")


# ---#

Re: [Structured Streaming] More than 1 streaming in a code

2018-04-13 Thread spark receiver
Hi Panagiotis ,

Wondering you solved the problem or not? Coz I met the same issue today. I’d 
appreciate  so much if you could paste the code snippet  if it’s working .

Thanks.


> 在 2018年4月6日,上午7:40,Aakash Basu <aakash.spark@gmail.com> 写道:
> 
> Hi Panagiotis,
> 
> I did that, but it still prints the result of the first query and awaits for 
> new data, doesn't even goes to the next one.
> 
> Data -
> 
> $ nc -lk 9998
> 
> 1,2
> 3,4
> 5,6
> 7,8
> 
> Result -
> 
> ---
> Batch: 0
> ---
> ++
> |aver|
> ++
> | 3.0|
> ++
> 
> ---
> Batch: 1
> ---
> ++
> |aver|
> ++
> | 4.0|
> ++
> 
> 
> Updated Code -
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import split
> 
> spark = SparkSession \
> .builder \
> .appName("StructuredNetworkWordCount") \
> .getOrCreate()
> 
> data = spark \
> .readStream \
> .format("socket") \
> .option("header","true") \
> .option("host", "localhost") \
> .option("port", 9998) \
> .load("csv")
> 
> 
> id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"), 
> split(data.value, ",").getItem(1).alias("col2"))
> 
> id_DF.createOrReplaceTempView("ds")
> 
> df = spark.sql("select avg(col1) as aver from ds")
> 
> df.createOrReplaceTempView("abcd")
> 
> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) col3 
> from ds")  # (select aver from abcd)
> 
> query2 = df \
> .writeStream \
> .format("console") \
> .outputMode("complete") \
> .trigger(processingTime='5 seconds') \
> .start()
> 
> query = wordCounts \
> .writeStream \
> .format("console") \
> .trigger(processingTime='5 seconds') \
> .start()
> 
> spark.streams.awaitAnyTermination()
> 
> 
> Thanks,
> Aakash.
> 
> On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <panga...@gmail.com 
> <mailto:panga...@gmail.com>> wrote:
> Hello Aakash,
> 
> When you use query.awaitTermination you are pretty much blocking there 
> waiting for the current query to stop or throw an exception. In your case the 
> second query will not even start.
> What you could do instead is remove all the blocking calls and use 
> spark.streams.awaitAnyTermination instead (waiting for either query1 or 
> query2 to terminate). Make sure you do that after the query2.start call.
> 
> I hope this helps.
> 
> Cheers,
> Panagiotis
> 
> On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <aakash.spark@gmail.com 
> <mailto:aakash.spark@gmail.com>> wrote:
> Any help?
> 
> Need urgent help. Someone please clarify the doubt?
> 
> -- Forwarded message --
> From: Aakash Basu <aakash.spark@gmail.com 
> <mailto:aakash.spark@gmail.com>>
> Date: Thu, Apr 5, 2018 at 3:18 PM
> Subject: [Structured Streaming] More than 1 streaming in a code
> To: user <user@spark.apache.org <mailto:user@spark.apache.org>>
> 
> 
> Hi,
> 
> If I have more than one writeStream in a code, which operates on the same 
> readStream data, why does it produce only the first writeStream? I want the 
> second one to be also printed on the console.
> 
> How to do that?
> 
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import split, col
> 
> class test:
> 
> 
> spark = SparkSession.builder \
> .appName("Stream_Col_Oper_Spark") \
> .getOrCreate()
> 
> data = spark.readStream.format("kafka") \
> .option("startingOffsets", "latest") \
> .option("kafka.bootstrap.servers", "localhost:9092") \
> .option("subscribe", "test1") \
> .load()
> 
> ID = data.select('value') \
> .withColumn('value', data.value.cast("string")) \
> .withColumn("Col1", split(col("value"), ",").getItem(0)) \
> .withColumn("Col2", split(col("value"), ",").getItem(1)) \
> .drop('value')
> 
> ID.createOrReplaceTempView("transformed_Stream_DF")
> 
> df = spark.sql("select avg(col1) as aver from transformed_Stream_DF")
> 
> df.

Re: [Structured Streaming] More than 1 streaming in a code

2018-04-06 Thread Aakash Basu
Hi Panagiotis,

I did that, but it still prints the result of the first query and awaits
for new data, doesn't even goes to the next one.

*Data -*

$ nc -lk 9998

1,2
3,4
5,6
7,8

*Result -*

---
Batch: 0
---
++
|aver|
++
| 3.0|
++

---
Batch: 1
---
++
|aver|
++
| 4.0|
++


*Updated Code -*

from pyspark.sql import SparkSession
from pyspark.sql.functions import split

spark = SparkSession \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()

data = spark \
.readStream \
.format("socket") \
.option("header","true") \
.option("host", "localhost") \
.option("port", 9998) \
.load("csv")


id_DF = data.select(split(data.value, ",").getItem(0).alias("col1"),
split(data.value, ",").getItem(1).alias("col2"))

id_DF.createOrReplaceTempView("ds")

df = spark.sql("select avg(col1) as aver from ds")

df.createOrReplaceTempView("abcd")

wordCounts = spark.sql("Select col1, col2, col2/(select aver from
abcd) col3 from ds")  # (select aver from abcd)

query2 = df \
.writeStream \
.format("console") \
.outputMode("complete") \
.trigger(processingTime='5 seconds') \
.start()

query = wordCounts \
.writeStream \
.format("console") \
.trigger(processingTime='5 seconds') \
.start()

spark.streams.awaitAnyTermination()



Thanks,
Aakash.

On Fri, Apr 6, 2018 at 4:18 PM, Panagiotis Garefalakis <panga...@gmail.com>
wrote:

> Hello Aakash,
>
> When you use query.awaitTermination you are pretty much blocking there
> waiting for the current query to stop or throw an exception. In your case
> the second query will not even start.
> What you could do instead is remove all the blocking calls and use
> spark.streams.awaitAnyTermination instead (waiting for either query1 or
> query2 to terminate). Make sure you do that after the query2.start call.
>
> I hope this helps.
>
> Cheers,
> Panagiotis
>
> On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <aakash.spark@gmail.com>
> wrote:
>
>> Any help?
>>
>> Need urgent help. Someone please clarify the doubt?
>>
>> -- Forwarded message --
>> From: Aakash Basu <aakash.spark@gmail.com>
>> Date: Thu, Apr 5, 2018 at 3:18 PM
>> Subject: [Structured Streaming] More than 1 streaming in a code
>> To: user <user@spark.apache.org>
>>
>>
>> Hi,
>>
>> If I have more than one writeStream in a code, which operates on the same
>> readStream data, why does it produce only the first writeStream? I want the
>> second one to be also printed on the console.
>>
>> How to do that?
>>
>> from pyspark.sql import SparkSession
>> from pyspark.sql.functions import split, col
>>
>> class test:
>>
>>
>> spark = SparkSession.builder \
>> .appName("Stream_Col_Oper_Spark") \
>> .getOrCreate()
>>
>> data = spark.readStream.format("kafka") \
>> .option("startingOffsets", "latest") \
>> .option("kafka.bootstrap.servers", "localhost:9092") \
>> .option("subscribe", "test1") \
>> .load()
>>
>> ID = data.select('value') \
>> .withColumn('value', data.value.cast("string")) \
>> .withColumn("Col1", split(col("value"), ",").getItem(0)) \
>> .withColumn("Col2", split(col("value"), ",").getItem(1)) \
>> .drop('value')
>>
>> ID.createOrReplaceTempView("transformed_Stream_DF")
>>
>> df = spark.sql("select avg(col1) as aver from transformed_Stream_DF")
>>
>> df.createOrReplaceTempView("abcd")
>>
>> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) 
>> col3 from transformed_Stream_DF")
>>
>>
>> # ---#
>>
>> query1 = df \
>> .writeStream \
>> .format("console") \
>> .outputMode("complete") \
>> .trigger(processingTime='3 seconds') \
>> .start()
>>
>> query1.awaitTermination()
>> # ---#
>>
>> query2 = wordCounts \
>> .writeStream \
>> .format("console") \
>> .trigger(processingTime='3 seconds') \
>> .start()
>>
>> query2.awaitTermination()
>>
>> # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit 
>> --packages 
>> org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3
>>  
>> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py
>>
>>
>>
>>
>> Thanks,
>> Aakash.
>>
>>
>


Re: [Structured Streaming] More than 1 streaming in a code

2018-04-06 Thread Panagiotis Garefalakis
Hello Aakash,

When you use query.awaitTermination you are pretty much blocking there
waiting for the current query to stop or throw an exception. In your case
the second query will not even start.
What you could do instead is remove all the blocking calls and use
spark.streams.awaitAnyTermination instead (waiting for either query1 or
query2 to terminate). Make sure you do that after the query2.start call.

I hope this helps.

Cheers,
Panagiotis

On Fri, Apr 6, 2018 at 11:23 AM, Aakash Basu <aakash.spark@gmail.com>
wrote:

> Any help?
>
> Need urgent help. Someone please clarify the doubt?
>
> -- Forwarded message --
> From: Aakash Basu <aakash.spark@gmail.com>
> Date: Thu, Apr 5, 2018 at 3:18 PM
> Subject: [Structured Streaming] More than 1 streaming in a code
> To: user <user@spark.apache.org>
>
>
> Hi,
>
> If I have more than one writeStream in a code, which operates on the same
> readStream data, why does it produce only the first writeStream? I want the
> second one to be also printed on the console.
>
> How to do that?
>
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import split, col
>
> class test:
>
>
> spark = SparkSession.builder \
> .appName("Stream_Col_Oper_Spark") \
> .getOrCreate()
>
> data = spark.readStream.format("kafka") \
> .option("startingOffsets", "latest") \
> .option("kafka.bootstrap.servers", "localhost:9092") \
> .option("subscribe", "test1") \
> .load()
>
> ID = data.select('value') \
> .withColumn('value', data.value.cast("string")) \
> .withColumn("Col1", split(col("value"), ",").getItem(0)) \
> .withColumn("Col2", split(col("value"), ",").getItem(1)) \
> .drop('value')
>
> ID.createOrReplaceTempView("transformed_Stream_DF")
>
> df = spark.sql("select avg(col1) as aver from transformed_Stream_DF")
>
> df.createOrReplaceTempView("abcd")
>
> wordCounts = spark.sql("Select col1, col2, col2/(select aver from abcd) 
> col3 from transformed_Stream_DF")
>
>
> # ---#
>
> query1 = df \
> .writeStream \
> .format("console") \
> .outputMode("complete") \
> .trigger(processingTime='3 seconds') \
> .start()
>
> query1.awaitTermination()
> # ---#
>
> query2 = wordCounts \
> .writeStream \
> .format("console") \
> .trigger(processingTime='3 seconds') \
> .start()
>
> query2.awaitTermination()
>
> # /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit 
> --packages 
> org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3
>  
> /home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py
>
>
>
>
> Thanks,
> Aakash.
>
>


Fwd: [Structured Streaming] More than 1 streaming in a code

2018-04-06 Thread Aakash Basu
Any help?

Need urgent help. Someone please clarify the doubt?

-- Forwarded message --
From: Aakash Basu <aakash.spark@gmail.com>
Date: Thu, Apr 5, 2018 at 3:18 PM
Subject: [Structured Streaming] More than 1 streaming in a code
To: user <user@spark.apache.org>


Hi,

If I have more than one writeStream in a code, which operates on the same
readStream data, why does it produce only the first writeStream? I want the
second one to be also printed on the console.

How to do that?

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

class test:


spark = SparkSession.builder \
.appName("Stream_Col_Oper_Spark") \
.getOrCreate()

data = spark.readStream.format("kafka") \
.option("startingOffsets", "latest") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test1") \
.load()

ID = data.select('value') \
.withColumn('value', data.value.cast("string")) \
.withColumn("Col1", split(col("value"), ",").getItem(0)) \
.withColumn("Col2", split(col("value"), ",").getItem(1)) \
.drop('value')

ID.createOrReplaceTempView("transformed_Stream_DF")

df = spark.sql("select avg(col1) as aver from transformed_Stream_DF")

df.createOrReplaceTempView("abcd")

wordCounts = spark.sql("Select col1, col2, col2/(select aver from
abcd) col3 from transformed_Stream_DF")


# ---#

query1 = df \
.writeStream \
.format("console") \
.outputMode("complete") \
.trigger(processingTime='3 seconds') \
.start()

query1.awaitTermination()
# ---#

query2 = wordCounts \
.writeStream \
.format("console") \
.trigger(processingTime='3 seconds') \
.start()

query2.awaitTermination()

# /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit
--packages 
org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3
/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py




Thanks,
Aakash.


[Structured Streaming] More than 1 streaming in a code

2018-04-05 Thread Aakash Basu
Hi,

If I have more than one writeStream in a code, which operates on the same
readStream data, why does it produce only the first writeStream? I want the
second one to be also printed on the console.

How to do that?

from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

class test:


spark = SparkSession.builder \
.appName("Stream_Col_Oper_Spark") \
.getOrCreate()

data = spark.readStream.format("kafka") \
.option("startingOffsets", "latest") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "test1") \
.load()

ID = data.select('value') \
.withColumn('value', data.value.cast("string")) \
.withColumn("Col1", split(col("value"), ",").getItem(0)) \
.withColumn("Col2", split(col("value"), ",").getItem(1)) \
.drop('value')

ID.createOrReplaceTempView("transformed_Stream_DF")

df = spark.sql("select avg(col1) as aver from transformed_Stream_DF")

df.createOrReplaceTempView("abcd")

wordCounts = spark.sql("Select col1, col2, col2/(select aver from
abcd) col3 from transformed_Stream_DF")


# ---#

query1 = df \
.writeStream \
.format("console") \
.outputMode("complete") \
.trigger(processingTime='3 seconds') \
.start()

query1.awaitTermination()
# ---#

query2 = wordCounts \
.writeStream \
.format("console") \
.trigger(processingTime='3 seconds') \
.start()

query2.awaitTermination()

# /home/kafka/Downloads/spark-2.3.0-bin-hadoop2.7/bin/spark-submit
--packages 
org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0,com.databricks:spark-csv_2.10:1.0.3
/home/aakashbasu/PycharmProjects/AllMyRnD/Kafka_Spark/Stream_Col_Oper_Spark.py




Thanks,
Aakash.